blob: 3a1f5ba5b6663d5cea385179dc46ad3fb0336dba [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Gregory P. Smithe5796c42018-12-30 20:17:57 -080031import sysconfig
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020032import threading
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000033import time
Guido van Rossum28524c72007-02-27 05:47:44 +000034import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000035import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020037from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020038from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000039from test import support
Berker Peksagce643912015-05-06 06:33:17 +030040from test.support.script_helper import assert_python_ok, run_python_until_end
Serhiy Storchakab21d1552018-03-02 11:53:51 +020041from test.support import FakePath
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000042
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000043import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000044import io # C implementation of io
45import _pyio as pyio # Python implementation of io
Guido van Rossuma9e20242007-03-08 00:43:48 +000046
Martin Panter6bb91f32016-05-28 00:41:57 +000047try:
48 import ctypes
49except ImportError:
50 def byteslike(*pos, **kw):
51 return array.array("b", bytes(*pos, **kw))
52else:
53 def byteslike(*pos, **kw):
54 """Create a bytes-like object having no string or sequence methods"""
55 data = bytes(*pos, **kw)
56 obj = EmptyStruct()
57 ctypes.resize(obj, len(data))
58 memoryview(obj).cast("B")[:] = data
59 return obj
60 class EmptyStruct(ctypes.Structure):
61 pass
62
Gregory P. Smithe5796c42018-12-30 20:17:57 -080063_cflags = sysconfig.get_config_var('CFLAGS') or ''
64_config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
65MEMORY_SANITIZER = (
66 '-fsanitize=memory' in _cflags or
67 '--with-memory-sanitizer' in _config_args
68)
69
Victor Stinnerbc2aa812019-05-23 03:45:09 +020070# Does io.IOBase finalizer log the exception if the close() method fails?
71# The exception is ignored silently by default in release build.
72IOBASE_EMITS_UNRAISABLE = (hasattr(sys, "gettotalrefcount") or sys.flags.dev_mode)
Victor Stinner44235042019-04-12 17:06:47 +020073
74
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000075def _default_chunk_size():
76 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000077 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000078 return f._CHUNK_SIZE
79
80
Antoine Pitrou328ec742010-09-14 18:37:24 +000081class MockRawIOWithoutRead:
82 """A RawIO implementation without read(), so as to exercise the default
83 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000084
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000085 def __init__(self, read_stack=()):
86 self._read_stack = list(read_stack)
87 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000088 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000089 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000090
Guido van Rossum01a27522007-03-07 01:00:12 +000091 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000092 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000093 return len(b)
94
95 def writable(self):
96 return True
97
Guido van Rossum68bbcd22007-02-27 17:19:33 +000098 def fileno(self):
99 return 42
100
101 def readable(self):
102 return True
103
Guido van Rossum01a27522007-03-07 01:00:12 +0000104 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000105 return True
106
Guido van Rossum01a27522007-03-07 01:00:12 +0000107 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000108 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +0000109
110 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000111 return 0 # same comment as above
112
113 def readinto(self, buf):
114 self._reads += 1
115 max_len = len(buf)
116 try:
117 data = self._read_stack[0]
118 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000119 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000120 return 0
121 if data is None:
122 del self._read_stack[0]
123 return None
124 n = len(data)
125 if len(data) <= max_len:
126 del self._read_stack[0]
127 buf[:n] = data
128 return n
129 else:
130 buf[:] = data[:max_len]
131 self._read_stack[0] = data[max_len:]
132 return max_len
133
134 def truncate(self, pos=None):
135 return pos
136
Antoine Pitrou328ec742010-09-14 18:37:24 +0000137class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
138 pass
139
140class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
141 pass
142
143
144class MockRawIO(MockRawIOWithoutRead):
145
146 def read(self, n=None):
147 self._reads += 1
148 try:
149 return self._read_stack.pop(0)
150 except:
151 self._extraneous_reads += 1
152 return b""
153
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000154class CMockRawIO(MockRawIO, io.RawIOBase):
155 pass
156
157class PyMockRawIO(MockRawIO, pyio.RawIOBase):
158 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000159
Guido van Rossuma9e20242007-03-08 00:43:48 +0000160
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000161class MisbehavedRawIO(MockRawIO):
162 def write(self, b):
163 return super().write(b) * 2
164
165 def read(self, n=None):
166 return super().read(n) * 2
167
168 def seek(self, pos, whence):
169 return -123
170
171 def tell(self):
172 return -456
173
174 def readinto(self, buf):
175 super().readinto(buf)
176 return len(buf) * 5
177
178class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
179 pass
180
181class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
182 pass
183
184
benfogle9703f092017-11-10 16:03:40 -0500185class SlowFlushRawIO(MockRawIO):
186 def __init__(self):
187 super().__init__()
188 self.in_flush = threading.Event()
189
190 def flush(self):
191 self.in_flush.set()
192 time.sleep(0.25)
193
194class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
195 pass
196
197class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
198 pass
199
200
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000201class CloseFailureIO(MockRawIO):
202 closed = 0
203
204 def close(self):
205 if not self.closed:
206 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200207 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000208
209class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
210 pass
211
212class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
213 pass
214
215
216class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000217
218 def __init__(self, data):
219 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000220 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000221
222 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000223 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000224 self.read_history.append(None if res is None else len(res))
225 return res
226
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000227 def readinto(self, b):
228 res = super().readinto(b)
229 self.read_history.append(res)
230 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000231
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000232class CMockFileIO(MockFileIO, io.BytesIO):
233 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000234
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000235class PyMockFileIO(MockFileIO, pyio.BytesIO):
236 pass
237
238
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000239class MockUnseekableIO:
240 def seekable(self):
241 return False
242
243 def seek(self, *args):
244 raise self.UnsupportedOperation("not seekable")
245
246 def tell(self, *args):
247 raise self.UnsupportedOperation("not seekable")
248
Martin Panter754aab22016-03-31 07:21:56 +0000249 def truncate(self, *args):
250 raise self.UnsupportedOperation("not seekable")
251
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000252class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
253 UnsupportedOperation = io.UnsupportedOperation
254
255class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
256 UnsupportedOperation = pyio.UnsupportedOperation
257
258
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000259class MockNonBlockWriterIO:
260
261 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000262 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000263 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000264
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000265 def pop_written(self):
266 s = b"".join(self._write_stack)
267 self._write_stack[:] = []
268 return s
269
270 def block_on(self, char):
271 """Block when a given char is encountered."""
272 self._blocker_char = char
273
274 def readable(self):
275 return True
276
277 def seekable(self):
278 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000279
Guido van Rossum01a27522007-03-07 01:00:12 +0000280 def writable(self):
281 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000282
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000283 def write(self, b):
284 b = bytes(b)
285 n = -1
286 if self._blocker_char:
287 try:
288 n = b.index(self._blocker_char)
289 except ValueError:
290 pass
291 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100292 if n > 0:
293 # write data up to the first blocker
294 self._write_stack.append(b[:n])
295 return n
296 else:
297 # cancel blocker and indicate would block
298 self._blocker_char = None
299 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000300 self._write_stack.append(b)
301 return len(b)
302
303class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
304 BlockingIOError = io.BlockingIOError
305
306class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
307 BlockingIOError = pyio.BlockingIOError
308
Guido van Rossuma9e20242007-03-08 00:43:48 +0000309
Guido van Rossum28524c72007-02-27 05:47:44 +0000310class IOTest(unittest.TestCase):
311
Neal Norwitze7789b12008-03-24 06:18:09 +0000312 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000313 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000314
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000315 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000316 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000317
Guido van Rossum28524c72007-02-27 05:47:44 +0000318 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000319 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000320 f.truncate(0)
321 self.assertEqual(f.tell(), 5)
322 f.seek(0)
323
324 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000325 self.assertEqual(f.seek(0), 0)
326 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000327 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000328 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000329 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000330 buffer = bytearray(b" world\n\n\n")
331 self.assertEqual(f.write(buffer), 9)
332 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000333 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000334 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000335 self.assertEqual(f.seek(-1, 2), 13)
336 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000337
Guido van Rossum87429772007-04-10 21:06:59 +0000338 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000339 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000340 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000341
Guido van Rossum9b76da62007-04-11 01:09:03 +0000342 def read_ops(self, f, buffered=False):
343 data = f.read(5)
344 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000345 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000346 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000347 self.assertEqual(bytes(data), b" worl")
348 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000349 self.assertEqual(f.readinto(data), 2)
350 self.assertEqual(len(data), 5)
351 self.assertEqual(data[:2], b"d\n")
352 self.assertEqual(f.seek(0), 0)
353 self.assertEqual(f.read(20), b"hello world\n")
354 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000355 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000356 self.assertEqual(f.seek(-6, 2), 6)
357 self.assertEqual(f.read(5), b"world")
358 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000359 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000360 self.assertEqual(f.seek(-6, 1), 5)
361 self.assertEqual(f.read(5), b" worl")
362 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000363 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000364 if buffered:
365 f.seek(0)
366 self.assertEqual(f.read(), b"hello world\n")
367 f.seek(6)
368 self.assertEqual(f.read(), b"world\n")
369 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000370 f.seek(0)
371 data = byteslike(5)
372 self.assertEqual(f.readinto1(data), 5)
373 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000374
Guido van Rossum34d69e52007-04-10 20:08:41 +0000375 LARGE = 2**31
376
Guido van Rossum53807da2007-04-10 19:01:47 +0000377 def large_file_ops(self, f):
378 assert f.readable()
379 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100380 try:
381 self.assertEqual(f.seek(self.LARGE), self.LARGE)
382 except (OverflowError, ValueError):
383 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000384 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000385 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000386 self.assertEqual(f.tell(), self.LARGE + 3)
387 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000388 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000389 self.assertEqual(f.tell(), self.LARGE + 2)
390 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000391 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000392 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000393 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
394 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000395 self.assertEqual(f.read(2), b"x")
396
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000397 def test_invalid_operations(self):
398 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000399 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000400 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000401 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000402 self.assertRaises(exc, fp.read)
403 self.assertRaises(exc, fp.readline)
404 with self.open(support.TESTFN, "wb", buffering=0) as fp:
405 self.assertRaises(exc, fp.read)
406 self.assertRaises(exc, fp.readline)
407 with self.open(support.TESTFN, "rb", buffering=0) as fp:
408 self.assertRaises(exc, fp.write, b"blah")
409 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000410 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000411 self.assertRaises(exc, fp.write, b"blah")
412 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000413 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000414 self.assertRaises(exc, fp.write, "blah")
415 self.assertRaises(exc, fp.writelines, ["blah\n"])
416 # Non-zero seeking from current or end pos
417 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
418 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000419
Martin Panter754aab22016-03-31 07:21:56 +0000420 def test_optional_abilities(self):
421 # Test for OSError when optional APIs are not supported
422 # The purpose of this test is to try fileno(), reading, writing and
423 # seeking operations with various objects that indicate they do not
424 # support these operations.
425
426 def pipe_reader():
427 [r, w] = os.pipe()
428 os.close(w) # So that read() is harmless
429 return self.FileIO(r, "r")
430
431 def pipe_writer():
432 [r, w] = os.pipe()
433 self.addCleanup(os.close, r)
434 # Guarantee that we can write into the pipe without blocking
435 thread = threading.Thread(target=os.read, args=(r, 100))
436 thread.start()
437 self.addCleanup(thread.join)
438 return self.FileIO(w, "w")
439
440 def buffered_reader():
441 return self.BufferedReader(self.MockUnseekableIO())
442
443 def buffered_writer():
444 return self.BufferedWriter(self.MockUnseekableIO())
445
446 def buffered_random():
447 return self.BufferedRandom(self.BytesIO())
448
449 def buffered_rw_pair():
450 return self.BufferedRWPair(self.MockUnseekableIO(),
451 self.MockUnseekableIO())
452
453 def text_reader():
454 class UnseekableReader(self.MockUnseekableIO):
455 writable = self.BufferedIOBase.writable
456 write = self.BufferedIOBase.write
457 return self.TextIOWrapper(UnseekableReader(), "ascii")
458
459 def text_writer():
460 class UnseekableWriter(self.MockUnseekableIO):
461 readable = self.BufferedIOBase.readable
462 read = self.BufferedIOBase.read
463 return self.TextIOWrapper(UnseekableWriter(), "ascii")
464
465 tests = (
466 (pipe_reader, "fr"), (pipe_writer, "fw"),
467 (buffered_reader, "r"), (buffered_writer, "w"),
468 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
469 (text_reader, "r"), (text_writer, "w"),
470 (self.BytesIO, "rws"), (self.StringIO, "rws"),
471 )
472 for [test, abilities] in tests:
Martin Panter754aab22016-03-31 07:21:56 +0000473 with self.subTest(test), test() as obj:
474 readable = "r" in abilities
475 self.assertEqual(obj.readable(), readable)
476 writable = "w" in abilities
477 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000478
479 if isinstance(obj, self.TextIOBase):
480 data = "3"
481 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
482 data = b"3"
483 else:
484 self.fail("Unknown base class")
485
486 if "f" in abilities:
487 obj.fileno()
488 else:
489 self.assertRaises(OSError, obj.fileno)
490
491 if readable:
492 obj.read(1)
493 obj.read()
494 else:
495 self.assertRaises(OSError, obj.read, 1)
496 self.assertRaises(OSError, obj.read)
497
498 if writable:
499 obj.write(data)
500 else:
501 self.assertRaises(OSError, obj.write, data)
502
Martin Panter3ee147f2016-03-31 21:05:31 +0000503 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000504 pipe_reader, pipe_writer):
505 # Pipes seem to appear as seekable on Windows
506 continue
507 seekable = "s" in abilities
508 self.assertEqual(obj.seekable(), seekable)
509
Martin Panter754aab22016-03-31 07:21:56 +0000510 if seekable:
511 obj.tell()
512 obj.seek(0)
513 else:
514 self.assertRaises(OSError, obj.tell)
515 self.assertRaises(OSError, obj.seek, 0)
516
517 if writable and seekable:
518 obj.truncate()
519 obj.truncate(0)
520 else:
521 self.assertRaises(OSError, obj.truncate)
522 self.assertRaises(OSError, obj.truncate, 0)
523
Antoine Pitrou13348842012-01-29 18:36:34 +0100524 def test_open_handles_NUL_chars(self):
525 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300526 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100527
528 bytes_fn = bytes(fn_with_NUL, 'ascii')
529 with warnings.catch_warnings():
530 warnings.simplefilter("ignore", DeprecationWarning)
531 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100532
Guido van Rossum28524c72007-02-27 05:47:44 +0000533 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000534 with self.open(support.TESTFN, "wb", buffering=0) as f:
535 self.assertEqual(f.readable(), False)
536 self.assertEqual(f.writable(), True)
537 self.assertEqual(f.seekable(), True)
538 self.write_ops(f)
539 with self.open(support.TESTFN, "rb", buffering=0) as f:
540 self.assertEqual(f.readable(), True)
541 self.assertEqual(f.writable(), False)
542 self.assertEqual(f.seekable(), True)
543 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000544
Guido van Rossum87429772007-04-10 21:06:59 +0000545 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000546 with self.open(support.TESTFN, "wb") as f:
547 self.assertEqual(f.readable(), False)
548 self.assertEqual(f.writable(), True)
549 self.assertEqual(f.seekable(), True)
550 self.write_ops(f)
551 with self.open(support.TESTFN, "rb") as f:
552 self.assertEqual(f.readable(), True)
553 self.assertEqual(f.writable(), False)
554 self.assertEqual(f.seekable(), True)
555 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000556
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000557 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000558 with self.open(support.TESTFN, "wb") as f:
559 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
560 with self.open(support.TESTFN, "rb") as f:
561 self.assertEqual(f.readline(), b"abc\n")
562 self.assertEqual(f.readline(10), b"def\n")
563 self.assertEqual(f.readline(2), b"xy")
564 self.assertEqual(f.readline(4), b"zzy\n")
565 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000566 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000567 self.assertRaises(TypeError, f.readline, 5.3)
568 with self.open(support.TESTFN, "r") as f:
569 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000570
Serhiy Storchakabf623ae2017-04-19 20:03:52 +0300571 def test_readline_nonsizeable(self):
572 # Issue #30061
573 # Crash when readline() returns an object without __len__
574 class R(self.IOBase):
575 def readline(self):
576 return None
577 self.assertRaises((TypeError, StopIteration), next, R())
578
579 def test_next_nonsizeable(self):
580 # Issue #30061
581 # Crash when __next__() returns an object without __len__
582 class R(self.IOBase):
583 def __next__(self):
584 return None
585 self.assertRaises(TypeError, R().readlines, 1)
586
Guido van Rossum28524c72007-02-27 05:47:44 +0000587 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000588 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000589 self.write_ops(f)
590 data = f.getvalue()
591 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000592 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000593 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000594
Guido van Rossum53807da2007-04-10 19:01:47 +0000595 def test_large_file_ops(self):
Mike53f7a7c2017-12-14 14:04:53 +0300596 # On Windows and Mac OSX this test consumes large resources; It takes
Victor Stinner8c663fd2017-11-08 14:44:44 -0800597 # a long time to build the >2 GiB file and takes >2 GiB of disk space
Guido van Rossum34d69e52007-04-10 20:08:41 +0000598 # therefore the resource must be enabled to run this test.
Victor Stinner937ee9e2018-06-26 02:11:06 +0200599 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600600 support.requires(
601 'largefile',
602 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000603 with self.open(support.TESTFN, "w+b", 0) as f:
604 self.large_file_ops(f)
605 with self.open(support.TESTFN, "w+b") as f:
606 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000607
608 def test_with_open(self):
Alexey Izbysheva2670562018-10-20 03:22:31 +0300609 for bufsize in (0, 100):
Guido van Rossum87429772007-04-10 21:06:59 +0000610 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000611 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000612 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000613 self.assertEqual(f.closed, True)
614 f = None
615 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000616 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000617 1/0
618 except ZeroDivisionError:
619 self.assertEqual(f.closed, True)
620 else:
621 self.fail("1/0 didn't raise an exception")
622
Antoine Pitrou08838b62009-01-21 00:55:13 +0000623 # issue 5008
624 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000625 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000626 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000627 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000628 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000629 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000630 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000631 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300632 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000633
Guido van Rossum87429772007-04-10 21:06:59 +0000634 def test_destructor(self):
635 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000636 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000637 def __del__(self):
638 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000639 try:
640 f = super().__del__
641 except AttributeError:
642 pass
643 else:
644 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000645 def close(self):
646 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000647 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000648 def flush(self):
649 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000650 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000651 with support.check_warnings(('', ResourceWarning)):
652 f = MyFileIO(support.TESTFN, "wb")
653 f.write(b"xxx")
654 del f
655 support.gc_collect()
656 self.assertEqual(record, [1, 2, 3])
657 with self.open(support.TESTFN, "rb") as f:
658 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000659
660 def _check_base_destructor(self, base):
661 record = []
662 class MyIO(base):
663 def __init__(self):
664 # This exercises the availability of attributes on object
665 # destruction.
666 # (in the C version, close() is called by the tp_dealloc
667 # function, not by __del__)
668 self.on_del = 1
669 self.on_close = 2
670 self.on_flush = 3
671 def __del__(self):
672 record.append(self.on_del)
673 try:
674 f = super().__del__
675 except AttributeError:
676 pass
677 else:
678 f()
679 def close(self):
680 record.append(self.on_close)
681 super().close()
682 def flush(self):
683 record.append(self.on_flush)
684 super().flush()
685 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000686 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000687 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000688 self.assertEqual(record, [1, 2, 3])
689
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000690 def test_IOBase_destructor(self):
691 self._check_base_destructor(self.IOBase)
692
693 def test_RawIOBase_destructor(self):
694 self._check_base_destructor(self.RawIOBase)
695
696 def test_BufferedIOBase_destructor(self):
697 self._check_base_destructor(self.BufferedIOBase)
698
699 def test_TextIOBase_destructor(self):
700 self._check_base_destructor(self.TextIOBase)
701
Guido van Rossum87429772007-04-10 21:06:59 +0000702 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000703 with self.open(support.TESTFN, "wb") as f:
704 f.write(b"xxx")
705 with self.open(support.TESTFN, "rb") as f:
706 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000707
Guido van Rossumd4103952007-04-12 05:44:49 +0000708 def test_array_writes(self):
709 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000710 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000711 def check(f):
712 with f:
713 self.assertEqual(f.write(a), n)
714 f.writelines((a,))
715 check(self.BytesIO())
716 check(self.FileIO(support.TESTFN, "w"))
717 check(self.BufferedWriter(self.MockRawIO()))
718 check(self.BufferedRandom(self.MockRawIO()))
719 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000720
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000721 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000722 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000723 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000724
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000725 def test_read_closed(self):
726 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000727 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000728 with self.open(support.TESTFN, "r") as f:
729 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000730 self.assertEqual(file.read(), "egg\n")
731 file.seek(0)
732 file.close()
733 self.assertRaises(ValueError, file.read)
734
735 def test_no_closefd_with_filename(self):
736 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000737 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000738
739 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000740 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000741 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000742 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000743 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000744 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000745 self.assertEqual(file.buffer.raw.closefd, False)
746
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000747 def test_garbage_collection(self):
748 # FileIO objects are collected, and collecting them flushes
749 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000750 with support.check_warnings(('', ResourceWarning)):
751 f = self.FileIO(support.TESTFN, "wb")
752 f.write(b"abcxxx")
753 f.f = f
754 wr = weakref.ref(f)
755 del f
756 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300757 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000758 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000759 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000760
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000761 def test_unbounded_file(self):
762 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
763 zero = "/dev/zero"
764 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000765 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000766 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000767 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000768 if support.real_max_memuse < support._2G:
Victor Stinner8c663fd2017-11-08 14:44:44 -0800769 self.skipTest("test requires at least 2 GiB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000770 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000771 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000772 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000773 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000774 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000775 self.assertRaises(OverflowError, f.read)
776
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200777 def check_flush_error_on_close(self, *args, **kwargs):
778 # Test that the file is closed despite failed flush
779 # and that flush() is called before file closed.
780 f = self.open(*args, **kwargs)
781 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000782 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200783 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200784 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000785 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200786 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600787 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200788 self.assertTrue(closed) # flush() called
789 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200790 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200791
792 def test_flush_error_on_close(self):
793 # raw file
794 # Issue #5700: io.FileIO calls flush() after file closed
795 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
796 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
797 self.check_flush_error_on_close(fd, 'wb', buffering=0)
798 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
799 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
800 os.close(fd)
801 # buffered io
802 self.check_flush_error_on_close(support.TESTFN, 'wb')
803 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
804 self.check_flush_error_on_close(fd, 'wb')
805 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
806 self.check_flush_error_on_close(fd, 'wb', closefd=False)
807 os.close(fd)
808 # text io
809 self.check_flush_error_on_close(support.TESTFN, 'w')
810 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
811 self.check_flush_error_on_close(fd, 'w')
812 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
813 self.check_flush_error_on_close(fd, 'w', closefd=False)
814 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000815
816 def test_multi_close(self):
817 f = self.open(support.TESTFN, "wb", buffering=0)
818 f.close()
819 f.close()
820 f.close()
821 self.assertRaises(ValueError, f.flush)
822
Antoine Pitrou328ec742010-09-14 18:37:24 +0000823 def test_RawIOBase_read(self):
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530824 # Exercise the default limited RawIOBase.read(n) implementation (which
825 # calls readinto() internally).
Antoine Pitrou328ec742010-09-14 18:37:24 +0000826 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
827 self.assertEqual(rawio.read(2), b"ab")
828 self.assertEqual(rawio.read(2), b"c")
829 self.assertEqual(rawio.read(2), b"d")
830 self.assertEqual(rawio.read(2), None)
831 self.assertEqual(rawio.read(2), b"ef")
832 self.assertEqual(rawio.read(2), b"g")
833 self.assertEqual(rawio.read(2), None)
834 self.assertEqual(rawio.read(2), b"")
835
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400836 def test_types_have_dict(self):
837 test = (
838 self.IOBase(),
839 self.RawIOBase(),
840 self.TextIOBase(),
841 self.StringIO(),
842 self.BytesIO()
843 )
844 for obj in test:
845 self.assertTrue(hasattr(obj, "__dict__"))
846
Ross Lagerwall59142db2011-10-31 20:34:46 +0200847 def test_opener(self):
848 with self.open(support.TESTFN, "w") as f:
849 f.write("egg\n")
850 fd = os.open(support.TESTFN, os.O_RDONLY)
851 def opener(path, flags):
852 return fd
853 with self.open("non-existent", "r", opener=opener) as f:
854 self.assertEqual(f.read(), "egg\n")
855
Barry Warsaw480e2852016-06-08 17:47:26 -0400856 def test_bad_opener_negative_1(self):
857 # Issue #27066.
858 def badopener(fname, flags):
859 return -1
860 with self.assertRaises(ValueError) as cm:
861 open('non-existent', 'r', opener=badopener)
862 self.assertEqual(str(cm.exception), 'opener returned -1')
863
864 def test_bad_opener_other_negative(self):
865 # Issue #27066.
866 def badopener(fname, flags):
867 return -2
868 with self.assertRaises(ValueError) as cm:
869 open('non-existent', 'r', opener=badopener)
870 self.assertEqual(str(cm.exception), 'opener returned -2')
871
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200872 def test_fileio_closefd(self):
873 # Issue #4841
874 with self.open(__file__, 'rb') as f1, \
875 self.open(__file__, 'rb') as f2:
876 fileio = self.FileIO(f1.fileno(), closefd=False)
877 # .__init__() must not close f1
878 fileio.__init__(f2.fileno(), closefd=False)
879 f1.readline()
880 # .close() must not close f2
881 fileio.close()
882 f2.readline()
883
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300884 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200885 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300886 with self.assertRaises(ValueError):
887 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300888
889 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200890 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300891 with self.assertRaises(ValueError):
892 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300893
Martin Panter6bb91f32016-05-28 00:41:57 +0000894 def test_buffered_readinto_mixin(self):
895 # Test the implementation provided by BufferedIOBase
896 class Stream(self.BufferedIOBase):
897 def read(self, size):
898 return b"12345"
899 read1 = read
900 stream = Stream()
901 for method in ("readinto", "readinto1"):
902 with self.subTest(method):
903 buffer = byteslike(5)
904 self.assertEqual(getattr(stream, method)(buffer), 5)
905 self.assertEqual(bytes(buffer), b"12345")
906
Ethan Furmand62548a2016-06-04 14:38:43 -0700907 def test_fspath_support(self):
Ethan Furmand62548a2016-06-04 14:38:43 -0700908 def check_path_succeeds(path):
909 with self.open(path, "w") as f:
910 f.write("egg\n")
911
912 with self.open(path, "r") as f:
913 self.assertEqual(f.read(), "egg\n")
914
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200915 check_path_succeeds(FakePath(support.TESTFN))
916 check_path_succeeds(FakePath(support.TESTFN.encode('utf-8')))
Ethan Furmand62548a2016-06-04 14:38:43 -0700917
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200918 with self.open(support.TESTFN, "w") as f:
919 bad_path = FakePath(f.fileno())
920 with self.assertRaises(TypeError):
921 self.open(bad_path, 'w')
922
923 bad_path = FakePath(None)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700924 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700925 self.open(bad_path, 'w')
926
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200927 bad_path = FakePath(FloatingPointError)
928 with self.assertRaises(FloatingPointError):
929 self.open(bad_path, 'w')
930
Ethan Furmand62548a2016-06-04 14:38:43 -0700931 # ensure that refcounting is correct with some error conditions
932 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200933 self.open(FakePath(support.TESTFN), 'rwxa')
Ethan Furmand62548a2016-06-04 14:38:43 -0700934
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530935 def test_RawIOBase_readall(self):
936 # Exercise the default unlimited RawIOBase.read() and readall()
937 # implementations.
938 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
939 self.assertEqual(rawio.read(), b"abcdefg")
940 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
941 self.assertEqual(rawio.readall(), b"abcdefg")
942
943 def test_BufferedIOBase_readinto(self):
944 # Exercise the default BufferedIOBase.readinto() and readinto1()
945 # implementations (which call read() or read1() internally).
946 class Reader(self.BufferedIOBase):
947 def __init__(self, avail):
948 self.avail = avail
949 def read(self, size):
950 result = self.avail[:size]
951 self.avail = self.avail[size:]
952 return result
953 def read1(self, size):
954 """Returns no more than 5 bytes at once"""
955 return self.read(min(size, 5))
956 tests = (
957 # (test method, total data available, read buffer size, expected
958 # read size)
959 ("readinto", 10, 5, 5),
960 ("readinto", 10, 6, 6), # More than read1() can return
961 ("readinto", 5, 6, 5), # Buffer larger than total available
962 ("readinto", 6, 7, 6),
963 ("readinto", 10, 0, 0), # Empty buffer
964 ("readinto1", 10, 5, 5), # Result limited to single read1() call
965 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return
966 ("readinto1", 5, 6, 5), # Buffer larger than total available
967 ("readinto1", 6, 7, 5),
968 ("readinto1", 10, 0, 0), # Empty buffer
969 )
970 UNUSED_BYTE = 0x81
971 for test in tests:
972 with self.subTest(test):
973 method, avail, request, result = test
974 reader = Reader(bytes(range(avail)))
975 buffer = bytearray((UNUSED_BYTE,) * request)
976 method = getattr(reader, method)
977 self.assertEqual(method(buffer), result)
978 self.assertEqual(len(buffer), request)
979 self.assertSequenceEqual(buffer[:result], range(result))
980 unused = (UNUSED_BYTE,) * (request - result)
981 self.assertSequenceEqual(buffer[result:], unused)
982 self.assertEqual(len(reader.avail), avail - result)
983
Zackery Spytz28f07362018-07-17 00:31:44 -0600984 def test_close_assert(self):
985 class R(self.IOBase):
986 def __setattr__(self, name, value):
987 pass
988 def flush(self):
989 raise OSError()
990 f = R()
991 # This would cause an assertion failure.
992 self.assertRaises(OSError, f.close)
993
Victor Stinner472f7942019-04-12 21:58:24 +0200994 # Silence destructor error
995 R.flush = lambda self: None
996
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200997
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000998class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200999
1000 def test_IOBase_finalize(self):
1001 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
1002 # class which inherits IOBase and an object of this class are caught
1003 # in a reference cycle and close() is already in the method cache.
1004 class MyIO(self.IOBase):
1005 def close(self):
1006 pass
1007
1008 # create an instance to populate the method cache
1009 MyIO()
1010 obj = MyIO()
1011 obj.obj = obj
1012 wr = weakref.ref(obj)
1013 del MyIO
1014 del obj
1015 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001016 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +00001017
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001018class PyIOTest(IOTest):
1019 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001020
Guido van Rossuma9e20242007-03-08 00:43:48 +00001021
Gregory P. Smith1bef9072015-04-14 13:24:34 -07001022@support.cpython_only
1023class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -07001024
Gregory P. Smith054b0652015-04-14 12:58:05 -07001025 def test_RawIOBase_io_in_pyio_match(self):
1026 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +02001027 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
1028 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -07001029 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1030
1031 def test_RawIOBase_pyio_in_io_match(self):
1032 """Test that c RawIOBase class has all pyio RawIOBase methods"""
1033 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1034 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1035
1036
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001037class CommonBufferedTests:
1038 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1039
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001040 def test_detach(self):
1041 raw = self.MockRawIO()
1042 buf = self.tp(raw)
1043 self.assertIs(buf.detach(), raw)
1044 self.assertRaises(ValueError, buf.detach)
1045
Benjamin Peterson10e76b62014-12-21 20:51:50 -06001046 repr(buf) # Should still work
1047
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001048 def test_fileno(self):
1049 rawio = self.MockRawIO()
1050 bufio = self.tp(rawio)
1051
Ezio Melottib3aedd42010-11-20 19:04:17 +00001052 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001053
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001054 def test_invalid_args(self):
1055 rawio = self.MockRawIO()
1056 bufio = self.tp(rawio)
1057 # Invalid whence
1058 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +02001059 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001060
1061 def test_override_destructor(self):
1062 tp = self.tp
1063 record = []
1064 class MyBufferedIO(tp):
1065 def __del__(self):
1066 record.append(1)
1067 try:
1068 f = super().__del__
1069 except AttributeError:
1070 pass
1071 else:
1072 f()
1073 def close(self):
1074 record.append(2)
1075 super().close()
1076 def flush(self):
1077 record.append(3)
1078 super().flush()
1079 rawio = self.MockRawIO()
1080 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001081 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001082 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +00001083 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001084
1085 def test_context_manager(self):
1086 # Test usability as a context manager
1087 rawio = self.MockRawIO()
1088 bufio = self.tp(rawio)
1089 def _with():
1090 with bufio:
1091 pass
1092 _with()
1093 # bufio should now be closed, and using it a second time should raise
1094 # a ValueError.
1095 self.assertRaises(ValueError, _with)
1096
1097 def test_error_through_destructor(self):
1098 # Test that the exception state is not modified by a destructor,
1099 # even if close() fails.
1100 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02001101 with support.catch_unraisable_exception() as cm:
1102 with self.assertRaises(AttributeError):
1103 self.tp(rawio).xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02001104
1105 if not IOBASE_EMITS_UNRAISABLE:
1106 self.assertIsNone(cm.unraisable)
1107 elif cm.unraisable is not None:
1108 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum78892e42007-04-06 17:31:18 +00001109
Antoine Pitrou716c4442009-05-23 19:04:03 +00001110 def test_repr(self):
1111 raw = self.MockRawIO()
1112 b = self.tp(raw)
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001113 clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__)
1114 self.assertRegex(repr(b), "<%s>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001115 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001116 self.assertRegex(repr(b), "<%s name='dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001117 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001118 self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001119
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02001120 def test_recursive_repr(self):
1121 # Issue #25455
1122 raw = self.MockRawIO()
1123 b = self.tp(raw)
1124 with support.swap_attr(raw, 'name', b):
1125 try:
1126 repr(b) # Should not crash
1127 except RuntimeError:
1128 pass
1129
Antoine Pitrou6be88762010-05-03 16:48:20 +00001130 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001131 # Test that buffered file is closed despite failed flush
1132 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001133 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001134 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001135 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001136 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001137 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001138 raw.flush = bad_flush
1139 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001140 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001141 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001142 self.assertTrue(raw.closed)
1143 self.assertTrue(closed) # flush() called
1144 self.assertFalse(closed[0]) # flush() called before file closed
1145 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001146 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001147
1148 def test_close_error_on_close(self):
1149 raw = self.MockRawIO()
1150 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001151 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001152 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001153 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001154 raw.close = bad_close
1155 b = self.tp(raw)
1156 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001157 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001158 b.close()
1159 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001160 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001161 self.assertEqual(err.exception.__context__.args, ('flush',))
1162 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001163
Victor Stinner472f7942019-04-12 21:58:24 +02001164 # Silence destructor error
1165 raw.close = lambda: None
1166 b.flush = lambda: None
1167
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001168 def test_nonnormalized_close_error_on_close(self):
1169 # Issue #21677
1170 raw = self.MockRawIO()
1171 def bad_flush():
1172 raise non_existing_flush
1173 def bad_close():
1174 raise non_existing_close
1175 raw.close = bad_close
1176 b = self.tp(raw)
1177 b.flush = bad_flush
1178 with self.assertRaises(NameError) as err: # exception not swallowed
1179 b.close()
1180 self.assertIn('non_existing_close', str(err.exception))
1181 self.assertIsInstance(err.exception.__context__, NameError)
1182 self.assertIn('non_existing_flush', str(err.exception.__context__))
1183 self.assertFalse(b.closed)
1184
Victor Stinner472f7942019-04-12 21:58:24 +02001185 # Silence destructor error
1186 b.flush = lambda: None
1187 raw.close = lambda: None
1188
Antoine Pitrou6be88762010-05-03 16:48:20 +00001189 def test_multi_close(self):
1190 raw = self.MockRawIO()
1191 b = self.tp(raw)
1192 b.close()
1193 b.close()
1194 b.close()
1195 self.assertRaises(ValueError, b.flush)
1196
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001197 def test_unseekable(self):
1198 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1199 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1200 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1201
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001202 def test_readonly_attributes(self):
1203 raw = self.MockRawIO()
1204 buf = self.tp(raw)
1205 x = self.MockRawIO()
1206 with self.assertRaises(AttributeError):
1207 buf.raw = x
1208
Guido van Rossum78892e42007-04-06 17:31:18 +00001209
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001210class SizeofTest:
1211
1212 @support.cpython_only
1213 def test_sizeof(self):
1214 bufsize1 = 4096
1215 bufsize2 = 8192
1216 rawio = self.MockRawIO()
1217 bufio = self.tp(rawio, buffer_size=bufsize1)
1218 size = sys.getsizeof(bufio) - bufsize1
1219 rawio = self.MockRawIO()
1220 bufio = self.tp(rawio, buffer_size=bufsize2)
1221 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1222
Jesus Ceadc469452012-10-04 12:37:56 +02001223 @support.cpython_only
1224 def test_buffer_freeing(self) :
1225 bufsize = 4096
1226 rawio = self.MockRawIO()
1227 bufio = self.tp(rawio, buffer_size=bufsize)
1228 size = sys.getsizeof(bufio) - bufsize
1229 bufio.close()
1230 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001231
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001232class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1233 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001234
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001235 def test_constructor(self):
1236 rawio = self.MockRawIO([b"abc"])
1237 bufio = self.tp(rawio)
1238 bufio.__init__(rawio)
1239 bufio.__init__(rawio, buffer_size=1024)
1240 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001241 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001242 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1243 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1244 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1245 rawio = self.MockRawIO([b"abc"])
1246 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001247 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001248
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001249 def test_uninitialized(self):
1250 bufio = self.tp.__new__(self.tp)
1251 del bufio
1252 bufio = self.tp.__new__(self.tp)
1253 self.assertRaisesRegex((ValueError, AttributeError),
1254 'uninitialized|has no attribute',
1255 bufio.read, 0)
1256 bufio.__init__(self.MockRawIO())
1257 self.assertEqual(bufio.read(0), b'')
1258
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001259 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001260 for arg in (None, 7):
1261 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1262 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001263 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001264 # Invalid args
1265 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001266
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001267 def test_read1(self):
1268 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1269 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001270 self.assertEqual(b"a", bufio.read(1))
1271 self.assertEqual(b"b", bufio.read1(1))
1272 self.assertEqual(rawio._reads, 1)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001273 self.assertEqual(b"", bufio.read1(0))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001274 self.assertEqual(b"c", bufio.read1(100))
1275 self.assertEqual(rawio._reads, 1)
1276 self.assertEqual(b"d", bufio.read1(100))
1277 self.assertEqual(rawio._reads, 2)
1278 self.assertEqual(b"efg", bufio.read1(100))
1279 self.assertEqual(rawio._reads, 3)
1280 self.assertEqual(b"", bufio.read1(100))
1281 self.assertEqual(rawio._reads, 4)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001282
1283 def test_read1_arbitrary(self):
1284 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1285 bufio = self.tp(rawio)
1286 self.assertEqual(b"a", bufio.read(1))
1287 self.assertEqual(b"bc", bufio.read1())
1288 self.assertEqual(b"d", bufio.read1())
1289 self.assertEqual(b"efg", bufio.read1(-1))
1290 self.assertEqual(rawio._reads, 3)
1291 self.assertEqual(b"", bufio.read1())
1292 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001293
1294 def test_readinto(self):
1295 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1296 bufio = self.tp(rawio)
1297 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001298 self.assertEqual(bufio.readinto(b), 2)
1299 self.assertEqual(b, b"ab")
1300 self.assertEqual(bufio.readinto(b), 2)
1301 self.assertEqual(b, b"cd")
1302 self.assertEqual(bufio.readinto(b), 2)
1303 self.assertEqual(b, b"ef")
1304 self.assertEqual(bufio.readinto(b), 1)
1305 self.assertEqual(b, b"gf")
1306 self.assertEqual(bufio.readinto(b), 0)
1307 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001308 rawio = self.MockRawIO((b"abc", None))
1309 bufio = self.tp(rawio)
1310 self.assertEqual(bufio.readinto(b), 2)
1311 self.assertEqual(b, b"ab")
1312 self.assertEqual(bufio.readinto(b), 1)
1313 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001314
Benjamin Petersona96fea02014-06-22 14:17:44 -07001315 def test_readinto1(self):
1316 buffer_size = 10
1317 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1318 bufio = self.tp(rawio, buffer_size=buffer_size)
1319 b = bytearray(2)
1320 self.assertEqual(bufio.peek(3), b'abc')
1321 self.assertEqual(rawio._reads, 1)
1322 self.assertEqual(bufio.readinto1(b), 2)
1323 self.assertEqual(b, b"ab")
1324 self.assertEqual(rawio._reads, 1)
1325 self.assertEqual(bufio.readinto1(b), 1)
1326 self.assertEqual(b[:1], b"c")
1327 self.assertEqual(rawio._reads, 1)
1328 self.assertEqual(bufio.readinto1(b), 2)
1329 self.assertEqual(b, b"de")
1330 self.assertEqual(rawio._reads, 2)
1331 b = bytearray(2*buffer_size)
1332 self.assertEqual(bufio.peek(3), b'fgh')
1333 self.assertEqual(rawio._reads, 3)
1334 self.assertEqual(bufio.readinto1(b), 6)
1335 self.assertEqual(b[:6], b"fghjkl")
1336 self.assertEqual(rawio._reads, 4)
1337
1338 def test_readinto_array(self):
1339 buffer_size = 60
1340 data = b"a" * 26
1341 rawio = self.MockRawIO((data,))
1342 bufio = self.tp(rawio, buffer_size=buffer_size)
1343
1344 # Create an array with element size > 1 byte
1345 b = array.array('i', b'x' * 32)
1346 assert len(b) != 16
1347
1348 # Read into it. We should get as many *bytes* as we can fit into b
1349 # (which is more than the number of elements)
1350 n = bufio.readinto(b)
1351 self.assertGreater(n, len(b))
1352
1353 # Check that old contents of b are preserved
1354 bm = memoryview(b).cast('B')
1355 self.assertLess(n, len(bm))
1356 self.assertEqual(bm[:n], data[:n])
1357 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1358
1359 def test_readinto1_array(self):
1360 buffer_size = 60
1361 data = b"a" * 26
1362 rawio = self.MockRawIO((data,))
1363 bufio = self.tp(rawio, buffer_size=buffer_size)
1364
1365 # Create an array with element size > 1 byte
1366 b = array.array('i', b'x' * 32)
1367 assert len(b) != 16
1368
1369 # Read into it. We should get as many *bytes* as we can fit into b
1370 # (which is more than the number of elements)
1371 n = bufio.readinto1(b)
1372 self.assertGreater(n, len(b))
1373
1374 # Check that old contents of b are preserved
1375 bm = memoryview(b).cast('B')
1376 self.assertLess(n, len(bm))
1377 self.assertEqual(bm[:n], data[:n])
1378 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1379
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001380 def test_readlines(self):
1381 def bufio():
1382 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1383 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001384 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1385 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1386 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001387
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001388 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001389 data = b"abcdefghi"
1390 dlen = len(data)
1391
1392 tests = [
1393 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1394 [ 100, [ 3, 3, 3], [ dlen ] ],
1395 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1396 ]
1397
1398 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001399 rawio = self.MockFileIO(data)
1400 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001401 pos = 0
1402 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001403 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001404 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001405 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001406 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001407
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001408 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001409 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001410 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1411 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001412 self.assertEqual(b"abcd", bufio.read(6))
1413 self.assertEqual(b"e", bufio.read(1))
1414 self.assertEqual(b"fg", bufio.read())
1415 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001416 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001417 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001418
Victor Stinnera80987f2011-05-25 22:47:16 +02001419 rawio = self.MockRawIO((b"a", None, None))
1420 self.assertEqual(b"a", rawio.readall())
1421 self.assertIsNone(rawio.readall())
1422
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001423 def test_read_past_eof(self):
1424 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1425 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001426
Ezio Melottib3aedd42010-11-20 19:04:17 +00001427 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001428
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001429 def test_read_all(self):
1430 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1431 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001432
Ezio Melottib3aedd42010-11-20 19:04:17 +00001433 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001434
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001435 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001436 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001437 try:
1438 # Write out many bytes with exactly the same number of 0's,
1439 # 1's... 255's. This will help us check that concurrent reading
1440 # doesn't duplicate or forget contents.
1441 N = 1000
1442 l = list(range(256)) * N
1443 random.shuffle(l)
1444 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001445 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001446 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001447 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001448 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001449 errors = []
1450 results = []
1451 def f():
1452 try:
1453 # Intra-buffer read then buffer-flushing read
1454 for n in cycle([1, 19]):
1455 s = bufio.read(n)
1456 if not s:
1457 break
1458 # list.append() is atomic
1459 results.append(s)
1460 except Exception as e:
1461 errors.append(e)
1462 raise
1463 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001464 with support.start_threads(threads):
1465 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001466 self.assertFalse(errors,
1467 "the following exceptions were caught: %r" % errors)
1468 s = b''.join(results)
1469 for i in range(256):
1470 c = bytes(bytearray([i]))
1471 self.assertEqual(s.count(c), N)
1472 finally:
1473 support.unlink(support.TESTFN)
1474
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001475 def test_unseekable(self):
1476 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1477 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1478 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1479 bufio.read(1)
1480 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1481 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1482
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001483 def test_misbehaved_io(self):
1484 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1485 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001486 self.assertRaises(OSError, bufio.seek, 0)
1487 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001488
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001489 def test_no_extraneous_read(self):
1490 # Issue #9550; when the raw IO object has satisfied the read request,
1491 # we should not issue any additional reads, otherwise it may block
1492 # (e.g. socket).
1493 bufsize = 16
1494 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1495 rawio = self.MockRawIO([b"x" * n])
1496 bufio = self.tp(rawio, bufsize)
1497 self.assertEqual(bufio.read(n), b"x" * n)
1498 # Simple case: one raw read is enough to satisfy the request.
1499 self.assertEqual(rawio._extraneous_reads, 0,
1500 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1501 # A more complex case where two raw reads are needed to satisfy
1502 # the request.
1503 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1504 bufio = self.tp(rawio, bufsize)
1505 self.assertEqual(bufio.read(n), b"x" * n)
1506 self.assertEqual(rawio._extraneous_reads, 0,
1507 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1508
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001509 def test_read_on_closed(self):
1510 # Issue #23796
1511 b = io.BufferedReader(io.BytesIO(b"12"))
1512 b.read(1)
1513 b.close()
1514 self.assertRaises(ValueError, b.peek)
1515 self.assertRaises(ValueError, b.read1, 1)
1516
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001517
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001518class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001519 tp = io.BufferedReader
1520
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001521 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1522 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001523 def test_constructor(self):
1524 BufferedReaderTest.test_constructor(self)
1525 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001526 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001527 if sys.maxsize > 0x7FFFFFFF:
1528 rawio = self.MockRawIO()
1529 bufio = self.tp(rawio)
1530 self.assertRaises((OverflowError, MemoryError, ValueError),
1531 bufio.__init__, rawio, sys.maxsize)
1532
1533 def test_initialization(self):
1534 rawio = self.MockRawIO([b"abc"])
1535 bufio = self.tp(rawio)
1536 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1537 self.assertRaises(ValueError, bufio.read)
1538 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1539 self.assertRaises(ValueError, bufio.read)
1540 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1541 self.assertRaises(ValueError, bufio.read)
1542
1543 def test_misbehaved_io_read(self):
1544 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1545 bufio = self.tp(rawio)
1546 # _pyio.BufferedReader seems to implement reading different, so that
1547 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001548 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001549
1550 def test_garbage_collection(self):
1551 # C BufferedReader objects are collected.
1552 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001553 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou796564c2013-07-30 19:59:21 +02001554 with support.check_warnings(('', ResourceWarning)):
1555 rawio = self.FileIO(support.TESTFN, "w+b")
1556 f = self.tp(rawio)
1557 f.f = f
1558 wr = weakref.ref(f)
1559 del f
1560 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001561 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001562
R David Murray67bfe802013-02-23 21:51:05 -05001563 def test_args_error(self):
1564 # Issue #17275
1565 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1566 self.tp(io.BytesIO(), 1024, 1024, 1024)
1567
1568
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001569class PyBufferedReaderTest(BufferedReaderTest):
1570 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001571
Guido van Rossuma9e20242007-03-08 00:43:48 +00001572
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001573class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1574 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001575
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001576 def test_constructor(self):
1577 rawio = self.MockRawIO()
1578 bufio = self.tp(rawio)
1579 bufio.__init__(rawio)
1580 bufio.__init__(rawio, buffer_size=1024)
1581 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001582 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001583 bufio.flush()
1584 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1585 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1586 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1587 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001588 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001589 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001590 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001591
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001592 def test_uninitialized(self):
1593 bufio = self.tp.__new__(self.tp)
1594 del bufio
1595 bufio = self.tp.__new__(self.tp)
1596 self.assertRaisesRegex((ValueError, AttributeError),
1597 'uninitialized|has no attribute',
1598 bufio.write, b'')
1599 bufio.__init__(self.MockRawIO())
1600 self.assertEqual(bufio.write(b''), 0)
1601
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001602 def test_detach_flush(self):
1603 raw = self.MockRawIO()
1604 buf = self.tp(raw)
1605 buf.write(b"howdy!")
1606 self.assertFalse(raw._write_stack)
1607 buf.detach()
1608 self.assertEqual(raw._write_stack, [b"howdy!"])
1609
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001610 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001611 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001612 writer = self.MockRawIO()
1613 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001614 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001615 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001616 buffer = bytearray(b"def")
1617 bufio.write(buffer)
1618 buffer[:] = b"***" # Overwrite our copy of the data
1619 bufio.flush()
1620 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001621
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001622 def test_write_overflow(self):
1623 writer = self.MockRawIO()
1624 bufio = self.tp(writer, 8)
1625 contents = b"abcdefghijklmnop"
1626 for n in range(0, len(contents), 3):
1627 bufio.write(contents[n:n+3])
1628 flushed = b"".join(writer._write_stack)
1629 # At least (total - 8) bytes were implicitly flushed, perhaps more
1630 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001631 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001632
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001633 def check_writes(self, intermediate_func):
1634 # Lots of writes, test the flushed output is as expected.
1635 contents = bytes(range(256)) * 1000
1636 n = 0
1637 writer = self.MockRawIO()
1638 bufio = self.tp(writer, 13)
1639 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1640 def gen_sizes():
1641 for size in count(1):
1642 for i in range(15):
1643 yield size
1644 sizes = gen_sizes()
1645 while n < len(contents):
1646 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001647 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001648 intermediate_func(bufio)
1649 n += size
1650 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001651 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001652
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001653 def test_writes(self):
1654 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001655
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001656 def test_writes_and_flushes(self):
1657 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001658
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001659 def test_writes_and_seeks(self):
1660 def _seekabs(bufio):
1661 pos = bufio.tell()
1662 bufio.seek(pos + 1, 0)
1663 bufio.seek(pos - 1, 0)
1664 bufio.seek(pos, 0)
1665 self.check_writes(_seekabs)
1666 def _seekrel(bufio):
1667 pos = bufio.seek(0, 1)
1668 bufio.seek(+1, 1)
1669 bufio.seek(-1, 1)
1670 bufio.seek(pos, 0)
1671 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001672
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001673 def test_writes_and_truncates(self):
1674 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001675
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001676 def test_write_non_blocking(self):
1677 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001678 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001679
Ezio Melottib3aedd42010-11-20 19:04:17 +00001680 self.assertEqual(bufio.write(b"abcd"), 4)
1681 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001682 # 1 byte will be written, the rest will be buffered
1683 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001684 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001685
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001686 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1687 raw.block_on(b"0")
1688 try:
1689 bufio.write(b"opqrwxyz0123456789")
1690 except self.BlockingIOError as e:
1691 written = e.characters_written
1692 else:
1693 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001694 self.assertEqual(written, 16)
1695 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001696 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001697
Ezio Melottib3aedd42010-11-20 19:04:17 +00001698 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001699 s = raw.pop_written()
1700 # Previously buffered bytes were flushed
1701 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001702
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001703 def test_write_and_rewind(self):
1704 raw = io.BytesIO()
1705 bufio = self.tp(raw, 4)
1706 self.assertEqual(bufio.write(b"abcdef"), 6)
1707 self.assertEqual(bufio.tell(), 6)
1708 bufio.seek(0, 0)
1709 self.assertEqual(bufio.write(b"XY"), 2)
1710 bufio.seek(6, 0)
1711 self.assertEqual(raw.getvalue(), b"XYcdef")
1712 self.assertEqual(bufio.write(b"123456"), 6)
1713 bufio.flush()
1714 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001715
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001716 def test_flush(self):
1717 writer = self.MockRawIO()
1718 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001719 bufio.write(b"abc")
1720 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001721 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001722
Antoine Pitrou131a4892012-10-16 22:57:11 +02001723 def test_writelines(self):
1724 l = [b'ab', b'cd', b'ef']
1725 writer = self.MockRawIO()
1726 bufio = self.tp(writer, 8)
1727 bufio.writelines(l)
1728 bufio.flush()
1729 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1730
1731 def test_writelines_userlist(self):
1732 l = UserList([b'ab', b'cd', b'ef'])
1733 writer = self.MockRawIO()
1734 bufio = self.tp(writer, 8)
1735 bufio.writelines(l)
1736 bufio.flush()
1737 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1738
1739 def test_writelines_error(self):
1740 writer = self.MockRawIO()
1741 bufio = self.tp(writer, 8)
1742 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1743 self.assertRaises(TypeError, bufio.writelines, None)
1744 self.assertRaises(TypeError, bufio.writelines, 'abc')
1745
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001746 def test_destructor(self):
1747 writer = self.MockRawIO()
1748 bufio = self.tp(writer, 8)
1749 bufio.write(b"abc")
1750 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001751 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001752 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001753
1754 def test_truncate(self):
1755 # Truncate implicitly flushes the buffer.
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001756 self.addCleanup(support.unlink, support.TESTFN)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001757 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001758 bufio = self.tp(raw, 8)
1759 bufio.write(b"abcdef")
1760 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001761 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001762 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001763 self.assertEqual(f.read(), b"abc")
1764
Nitish Chandra059f58c2018-01-28 21:30:09 +05301765 def test_truncate_after_write(self):
1766 # Ensure that truncate preserves the file position after
1767 # writes longer than the buffer size.
1768 # Issue: https://bugs.python.org/issue32228
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001769 self.addCleanup(support.unlink, support.TESTFN)
Nitish Chandra059f58c2018-01-28 21:30:09 +05301770 with self.open(support.TESTFN, "wb") as f:
1771 # Fill with some buffer
1772 f.write(b'\x00' * 10000)
1773 buffer_sizes = [8192, 4096, 200]
1774 for buffer_size in buffer_sizes:
1775 with self.open(support.TESTFN, "r+b", buffering=buffer_size) as f:
1776 f.write(b'\x00' * (buffer_size + 1))
1777 # After write write_pos and write_end are set to 0
1778 f.read(1)
1779 # read operation makes sure that pos != raw_pos
1780 f.truncate()
1781 self.assertEqual(f.tell(), buffer_size + 2)
1782
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001783 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001784 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001785 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001786 # Write out many bytes from many threads and test they were
1787 # all flushed.
1788 N = 1000
1789 contents = bytes(range(256)) * N
1790 sizes = cycle([1, 19])
1791 n = 0
1792 queue = deque()
1793 while n < len(contents):
1794 size = next(sizes)
1795 queue.append(contents[n:n+size])
1796 n += size
1797 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001798 # We use a real file object because it allows us to
1799 # exercise situations where the GIL is released before
1800 # writing the buffer to the raw streams. This is in addition
1801 # to concurrency issues due to switching threads in the middle
1802 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001803 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001804 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001805 errors = []
1806 def f():
1807 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001808 while True:
1809 try:
1810 s = queue.popleft()
1811 except IndexError:
1812 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001813 bufio.write(s)
1814 except Exception as e:
1815 errors.append(e)
1816 raise
1817 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001818 with support.start_threads(threads):
1819 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001820 self.assertFalse(errors,
1821 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001822 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001823 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001824 s = f.read()
1825 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001826 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001827 finally:
1828 support.unlink(support.TESTFN)
1829
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001830 def test_misbehaved_io(self):
1831 rawio = self.MisbehavedRawIO()
1832 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001833 self.assertRaises(OSError, bufio.seek, 0)
1834 self.assertRaises(OSError, bufio.tell)
1835 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001836
Florent Xicluna109d5732012-07-07 17:03:22 +02001837 def test_max_buffer_size_removal(self):
1838 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001839 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001840
Benjamin Peterson68623612012-12-20 11:53:11 -06001841 def test_write_error_on_close(self):
1842 raw = self.MockRawIO()
1843 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001844 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001845 raw.write = bad_write
1846 b = self.tp(raw)
1847 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001848 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001849 self.assertTrue(b.closed)
1850
benfogle9703f092017-11-10 16:03:40 -05001851 def test_slow_close_from_thread(self):
1852 # Issue #31976
1853 rawio = self.SlowFlushRawIO()
1854 bufio = self.tp(rawio, 8)
1855 t = threading.Thread(target=bufio.close)
1856 t.start()
1857 rawio.in_flush.wait()
1858 self.assertRaises(ValueError, bufio.write, b'spam')
1859 self.assertTrue(bufio.closed)
1860 t.join()
1861
1862
Benjamin Peterson59406a92009-03-26 17:10:29 +00001863
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001864class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001865 tp = io.BufferedWriter
1866
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001867 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1868 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001869 def test_constructor(self):
1870 BufferedWriterTest.test_constructor(self)
1871 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001872 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001873 if sys.maxsize > 0x7FFFFFFF:
1874 rawio = self.MockRawIO()
1875 bufio = self.tp(rawio)
1876 self.assertRaises((OverflowError, MemoryError, ValueError),
1877 bufio.__init__, rawio, sys.maxsize)
1878
1879 def test_initialization(self):
1880 rawio = self.MockRawIO()
1881 bufio = self.tp(rawio)
1882 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1883 self.assertRaises(ValueError, bufio.write, b"def")
1884 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1885 self.assertRaises(ValueError, bufio.write, b"def")
1886 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1887 self.assertRaises(ValueError, bufio.write, b"def")
1888
1889 def test_garbage_collection(self):
1890 # C BufferedWriter objects are collected, and collecting them flushes
1891 # all data to disk.
1892 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001893 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou796564c2013-07-30 19:59:21 +02001894 with support.check_warnings(('', ResourceWarning)):
1895 rawio = self.FileIO(support.TESTFN, "w+b")
1896 f = self.tp(rawio)
1897 f.write(b"123xxx")
1898 f.x = f
1899 wr = weakref.ref(f)
1900 del f
1901 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001902 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001903 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001904 self.assertEqual(f.read(), b"123xxx")
1905
R David Murray67bfe802013-02-23 21:51:05 -05001906 def test_args_error(self):
1907 # Issue #17275
1908 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1909 self.tp(io.BytesIO(), 1024, 1024, 1024)
1910
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001911
1912class PyBufferedWriterTest(BufferedWriterTest):
1913 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001914
Guido van Rossum01a27522007-03-07 01:00:12 +00001915class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001916
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001917 def test_constructor(self):
1918 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001919 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001920
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001921 def test_uninitialized(self):
1922 pair = self.tp.__new__(self.tp)
1923 del pair
1924 pair = self.tp.__new__(self.tp)
1925 self.assertRaisesRegex((ValueError, AttributeError),
1926 'uninitialized|has no attribute',
1927 pair.read, 0)
1928 self.assertRaisesRegex((ValueError, AttributeError),
1929 'uninitialized|has no attribute',
1930 pair.write, b'')
1931 pair.__init__(self.MockRawIO(), self.MockRawIO())
1932 self.assertEqual(pair.read(0), b'')
1933 self.assertEqual(pair.write(b''), 0)
1934
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001935 def test_detach(self):
1936 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1937 self.assertRaises(self.UnsupportedOperation, pair.detach)
1938
Florent Xicluna109d5732012-07-07 17:03:22 +02001939 def test_constructor_max_buffer_size_removal(self):
1940 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001941 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001942
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001943 def test_constructor_with_not_readable(self):
1944 class NotReadable(MockRawIO):
1945 def readable(self):
1946 return False
1947
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001948 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001949
1950 def test_constructor_with_not_writeable(self):
1951 class NotWriteable(MockRawIO):
1952 def writable(self):
1953 return False
1954
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001955 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001956
1957 def test_read(self):
1958 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1959
1960 self.assertEqual(pair.read(3), b"abc")
1961 self.assertEqual(pair.read(1), b"d")
1962 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001963 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1964 self.assertEqual(pair.read(None), b"abc")
1965
1966 def test_readlines(self):
1967 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1968 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1969 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1970 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001971
1972 def test_read1(self):
1973 # .read1() is delegated to the underlying reader object, so this test
1974 # can be shallow.
1975 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1976
1977 self.assertEqual(pair.read1(3), b"abc")
Martin Panterccb2c0e2016-10-20 23:48:14 +00001978 self.assertEqual(pair.read1(), b"def")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001979
1980 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00001981 for method in ("readinto", "readinto1"):
1982 with self.subTest(method):
1983 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001984
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03001985 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00001986 self.assertEqual(getattr(pair, method)(data), 5)
1987 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001988
1989 def test_write(self):
1990 w = self.MockRawIO()
1991 pair = self.tp(self.MockRawIO(), w)
1992
1993 pair.write(b"abc")
1994 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00001995 buffer = bytearray(b"def")
1996 pair.write(buffer)
1997 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001998 pair.flush()
1999 self.assertEqual(w._write_stack, [b"abc", b"def"])
2000
2001 def test_peek(self):
2002 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2003
2004 self.assertTrue(pair.peek(3).startswith(b"abc"))
2005 self.assertEqual(pair.read(3), b"abc")
2006
2007 def test_readable(self):
2008 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2009 self.assertTrue(pair.readable())
2010
2011 def test_writeable(self):
2012 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2013 self.assertTrue(pair.writable())
2014
2015 def test_seekable(self):
2016 # BufferedRWPairs are never seekable, even if their readers and writers
2017 # are.
2018 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2019 self.assertFalse(pair.seekable())
2020
2021 # .flush() is delegated to the underlying writer object and has been
2022 # tested in the test_write method.
2023
2024 def test_close_and_closed(self):
2025 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2026 self.assertFalse(pair.closed)
2027 pair.close()
2028 self.assertTrue(pair.closed)
2029
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002030 def test_reader_close_error_on_close(self):
2031 def reader_close():
2032 reader_non_existing
2033 reader = self.MockRawIO()
2034 reader.close = reader_close
2035 writer = self.MockRawIO()
2036 pair = self.tp(reader, writer)
2037 with self.assertRaises(NameError) as err:
2038 pair.close()
2039 self.assertIn('reader_non_existing', str(err.exception))
2040 self.assertTrue(pair.closed)
2041 self.assertFalse(reader.closed)
2042 self.assertTrue(writer.closed)
2043
Victor Stinner472f7942019-04-12 21:58:24 +02002044 # Silence destructor error
2045 reader.close = lambda: None
2046
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002047 def test_writer_close_error_on_close(self):
2048 def writer_close():
2049 writer_non_existing
2050 reader = self.MockRawIO()
2051 writer = self.MockRawIO()
2052 writer.close = writer_close
2053 pair = self.tp(reader, writer)
2054 with self.assertRaises(NameError) as err:
2055 pair.close()
2056 self.assertIn('writer_non_existing', str(err.exception))
2057 self.assertFalse(pair.closed)
2058 self.assertTrue(reader.closed)
2059 self.assertFalse(writer.closed)
2060
Victor Stinner472f7942019-04-12 21:58:24 +02002061 # Silence destructor error
2062 writer.close = lambda: None
2063
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002064 def test_reader_writer_close_error_on_close(self):
2065 def reader_close():
2066 reader_non_existing
2067 def writer_close():
2068 writer_non_existing
2069 reader = self.MockRawIO()
2070 reader.close = reader_close
2071 writer = self.MockRawIO()
2072 writer.close = writer_close
2073 pair = self.tp(reader, writer)
2074 with self.assertRaises(NameError) as err:
2075 pair.close()
2076 self.assertIn('reader_non_existing', str(err.exception))
2077 self.assertIsInstance(err.exception.__context__, NameError)
2078 self.assertIn('writer_non_existing', str(err.exception.__context__))
2079 self.assertFalse(pair.closed)
2080 self.assertFalse(reader.closed)
2081 self.assertFalse(writer.closed)
2082
Victor Stinner472f7942019-04-12 21:58:24 +02002083 # Silence destructor error
2084 reader.close = lambda: None
2085 writer.close = lambda: None
2086
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002087 def test_isatty(self):
2088 class SelectableIsAtty(MockRawIO):
2089 def __init__(self, isatty):
2090 MockRawIO.__init__(self)
2091 self._isatty = isatty
2092
2093 def isatty(self):
2094 return self._isatty
2095
2096 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2097 self.assertFalse(pair.isatty())
2098
2099 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2100 self.assertTrue(pair.isatty())
2101
2102 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2103 self.assertTrue(pair.isatty())
2104
2105 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2106 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00002107
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04002108 def test_weakref_clearing(self):
2109 brw = self.tp(self.MockRawIO(), self.MockRawIO())
2110 ref = weakref.ref(brw)
2111 brw = None
2112 ref = None # Shouldn't segfault.
2113
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002114class CBufferedRWPairTest(BufferedRWPairTest):
2115 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002116
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002117class PyBufferedRWPairTest(BufferedRWPairTest):
2118 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002119
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002120
2121class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2122 read_mode = "rb+"
2123 write_mode = "wb+"
2124
2125 def test_constructor(self):
2126 BufferedReaderTest.test_constructor(self)
2127 BufferedWriterTest.test_constructor(self)
2128
Serhiy Storchaka61e24932014-02-12 10:52:35 +02002129 def test_uninitialized(self):
2130 BufferedReaderTest.test_uninitialized(self)
2131 BufferedWriterTest.test_uninitialized(self)
2132
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002133 def test_read_and_write(self):
2134 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00002135 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00002136
2137 self.assertEqual(b"as", rw.read(2))
2138 rw.write(b"ddd")
2139 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002140 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002141 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002142 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00002143
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002144 def test_seek_and_tell(self):
2145 raw = self.BytesIO(b"asdfghjkl")
2146 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00002147
Ezio Melottib3aedd42010-11-20 19:04:17 +00002148 self.assertEqual(b"as", rw.read(2))
2149 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002150 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002151 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00002152
Antoine Pitroue05565e2011-08-20 14:39:23 +02002153 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00002154 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02002155 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002156 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002157 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002158 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002159 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002160 self.assertEqual(7, rw.tell())
2161 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02002162 rw.flush()
2163 self.assertEqual(b"asdf123fl", raw.getvalue())
2164
Christian Heimes8e42a0a2007-11-08 18:04:45 +00002165 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002166
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002167 def check_flush_and_read(self, read_func):
2168 raw = self.BytesIO(b"abcdefghi")
2169 bufio = self.tp(raw)
2170
Ezio Melottib3aedd42010-11-20 19:04:17 +00002171 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002172 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002173 self.assertEqual(b"ef", read_func(bufio, 2))
2174 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002175 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002176 self.assertEqual(6, bufio.tell())
2177 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002178 raw.seek(0, 0)
2179 raw.write(b"XYZ")
2180 # flush() resets the read buffer
2181 bufio.flush()
2182 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002183 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002184
2185 def test_flush_and_read(self):
2186 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2187
2188 def test_flush_and_readinto(self):
2189 def _readinto(bufio, n=-1):
2190 b = bytearray(n if n >= 0 else 9999)
2191 n = bufio.readinto(b)
2192 return bytes(b[:n])
2193 self.check_flush_and_read(_readinto)
2194
2195 def test_flush_and_peek(self):
2196 def _peek(bufio, n=-1):
2197 # This relies on the fact that the buffer can contain the whole
2198 # raw stream, otherwise peek() can return less.
2199 b = bufio.peek(n)
2200 if n != -1:
2201 b = b[:n]
2202 bufio.seek(len(b), 1)
2203 return b
2204 self.check_flush_and_read(_peek)
2205
2206 def test_flush_and_write(self):
2207 raw = self.BytesIO(b"abcdefghi")
2208 bufio = self.tp(raw)
2209
2210 bufio.write(b"123")
2211 bufio.flush()
2212 bufio.write(b"45")
2213 bufio.flush()
2214 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002215 self.assertEqual(b"12345fghi", raw.getvalue())
2216 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002217
2218 def test_threads(self):
2219 BufferedReaderTest.test_threads(self)
2220 BufferedWriterTest.test_threads(self)
2221
2222 def test_writes_and_peek(self):
2223 def _peek(bufio):
2224 bufio.peek(1)
2225 self.check_writes(_peek)
2226 def _peek(bufio):
2227 pos = bufio.tell()
2228 bufio.seek(-1, 1)
2229 bufio.peek(1)
2230 bufio.seek(pos, 0)
2231 self.check_writes(_peek)
2232
2233 def test_writes_and_reads(self):
2234 def _read(bufio):
2235 bufio.seek(-1, 1)
2236 bufio.read(1)
2237 self.check_writes(_read)
2238
2239 def test_writes_and_read1s(self):
2240 def _read1(bufio):
2241 bufio.seek(-1, 1)
2242 bufio.read1(1)
2243 self.check_writes(_read1)
2244
2245 def test_writes_and_readintos(self):
2246 def _read(bufio):
2247 bufio.seek(-1, 1)
2248 bufio.readinto(bytearray(1))
2249 self.check_writes(_read)
2250
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002251 def test_write_after_readahead(self):
2252 # Issue #6629: writing after the buffer was filled by readahead should
2253 # first rewind the raw stream.
2254 for overwrite_size in [1, 5]:
2255 raw = self.BytesIO(b"A" * 10)
2256 bufio = self.tp(raw, 4)
2257 # Trigger readahead
2258 self.assertEqual(bufio.read(1), b"A")
2259 self.assertEqual(bufio.tell(), 1)
2260 # Overwriting should rewind the raw stream if it needs so
2261 bufio.write(b"B" * overwrite_size)
2262 self.assertEqual(bufio.tell(), overwrite_size + 1)
2263 # If the write size was smaller than the buffer size, flush() and
2264 # check that rewind happens.
2265 bufio.flush()
2266 self.assertEqual(bufio.tell(), overwrite_size + 1)
2267 s = raw.getvalue()
2268 self.assertEqual(s,
2269 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2270
Antoine Pitrou7c404892011-05-13 00:13:33 +02002271 def test_write_rewind_write(self):
2272 # Various combinations of reading / writing / seeking backwards / writing again
2273 def mutate(bufio, pos1, pos2):
2274 assert pos2 >= pos1
2275 # Fill the buffer
2276 bufio.seek(pos1)
2277 bufio.read(pos2 - pos1)
2278 bufio.write(b'\x02')
2279 # This writes earlier than the previous write, but still inside
2280 # the buffer.
2281 bufio.seek(pos1)
2282 bufio.write(b'\x01')
2283
2284 b = b"\x80\x81\x82\x83\x84"
2285 for i in range(0, len(b)):
2286 for j in range(i, len(b)):
2287 raw = self.BytesIO(b)
2288 bufio = self.tp(raw, 100)
2289 mutate(bufio, i, j)
2290 bufio.flush()
2291 expected = bytearray(b)
2292 expected[j] = 2
2293 expected[i] = 1
2294 self.assertEqual(raw.getvalue(), expected,
2295 "failed result for i=%d, j=%d" % (i, j))
2296
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002297 def test_truncate_after_read_or_write(self):
2298 raw = self.BytesIO(b"A" * 10)
2299 bufio = self.tp(raw, 100)
2300 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2301 self.assertEqual(bufio.truncate(), 2)
2302 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2303 self.assertEqual(bufio.truncate(), 4)
2304
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002305 def test_misbehaved_io(self):
2306 BufferedReaderTest.test_misbehaved_io(self)
2307 BufferedWriterTest.test_misbehaved_io(self)
2308
Antoine Pitroue05565e2011-08-20 14:39:23 +02002309 def test_interleaved_read_write(self):
2310 # Test for issue #12213
2311 with self.BytesIO(b'abcdefgh') as raw:
2312 with self.tp(raw, 100) as f:
2313 f.write(b"1")
2314 self.assertEqual(f.read(1), b'b')
2315 f.write(b'2')
2316 self.assertEqual(f.read1(1), b'd')
2317 f.write(b'3')
2318 buf = bytearray(1)
2319 f.readinto(buf)
2320 self.assertEqual(buf, b'f')
2321 f.write(b'4')
2322 self.assertEqual(f.peek(1), b'h')
2323 f.flush()
2324 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2325
2326 with self.BytesIO(b'abc') as raw:
2327 with self.tp(raw, 100) as f:
2328 self.assertEqual(f.read(1), b'a')
2329 f.write(b"2")
2330 self.assertEqual(f.read(1), b'c')
2331 f.flush()
2332 self.assertEqual(raw.getvalue(), b'a2c')
2333
2334 def test_interleaved_readline_write(self):
2335 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2336 with self.tp(raw) as f:
2337 f.write(b'1')
2338 self.assertEqual(f.readline(), b'b\n')
2339 f.write(b'2')
2340 self.assertEqual(f.readline(), b'def\n')
2341 f.write(b'3')
2342 self.assertEqual(f.readline(), b'\n')
2343 f.flush()
2344 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2345
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002346 # You can't construct a BufferedRandom over a non-seekable stream.
2347 test_unseekable = None
2348
R David Murray67bfe802013-02-23 21:51:05 -05002349
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002350class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002351 tp = io.BufferedRandom
2352
Gregory P. Smithe5796c42018-12-30 20:17:57 -08002353 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
2354 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002355 def test_constructor(self):
2356 BufferedRandomTest.test_constructor(self)
2357 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08002358 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002359 if sys.maxsize > 0x7FFFFFFF:
2360 rawio = self.MockRawIO()
2361 bufio = self.tp(rawio)
2362 self.assertRaises((OverflowError, MemoryError, ValueError),
2363 bufio.__init__, rawio, sys.maxsize)
2364
2365 def test_garbage_collection(self):
2366 CBufferedReaderTest.test_garbage_collection(self)
2367 CBufferedWriterTest.test_garbage_collection(self)
2368
R David Murray67bfe802013-02-23 21:51:05 -05002369 def test_args_error(self):
2370 # Issue #17275
2371 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2372 self.tp(io.BytesIO(), 1024, 1024, 1024)
2373
2374
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002375class PyBufferedRandomTest(BufferedRandomTest):
2376 tp = pyio.BufferedRandom
2377
2378
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002379# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2380# properties:
2381# - A single output character can correspond to many bytes of input.
2382# - The number of input bytes to complete the character can be
2383# undetermined until the last input byte is received.
2384# - The number of input bytes can vary depending on previous input.
2385# - A single input byte can correspond to many characters of output.
2386# - The number of output characters can be undetermined until the
2387# last input byte is received.
2388# - The number of output characters can vary depending on previous input.
2389
2390class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2391 """
2392 For testing seek/tell behavior with a stateful, buffering decoder.
2393
2394 Input is a sequence of words. Words may be fixed-length (length set
2395 by input) or variable-length (period-terminated). In variable-length
2396 mode, extra periods are ignored. Possible words are:
2397 - 'i' followed by a number sets the input length, I (maximum 99).
2398 When I is set to 0, words are space-terminated.
2399 - 'o' followed by a number sets the output length, O (maximum 99).
2400 - Any other word is converted into a word followed by a period on
2401 the output. The output word consists of the input word truncated
2402 or padded out with hyphens to make its length equal to O. If O
2403 is 0, the word is output verbatim without truncating or padding.
2404 I and O are initially set to 1. When I changes, any buffered input is
2405 re-scanned according to the new I. EOF also terminates the last word.
2406 """
2407
2408 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002409 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002410 self.reset()
2411
2412 def __repr__(self):
2413 return '<SID %x>' % id(self)
2414
2415 def reset(self):
2416 self.i = 1
2417 self.o = 1
2418 self.buffer = bytearray()
2419
2420 def getstate(self):
2421 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2422 return bytes(self.buffer), i*100 + o
2423
2424 def setstate(self, state):
2425 buffer, io = state
2426 self.buffer = bytearray(buffer)
2427 i, o = divmod(io, 100)
2428 self.i, self.o = i ^ 1, o ^ 1
2429
2430 def decode(self, input, final=False):
2431 output = ''
2432 for b in input:
2433 if self.i == 0: # variable-length, terminated with period
2434 if b == ord('.'):
2435 if self.buffer:
2436 output += self.process_word()
2437 else:
2438 self.buffer.append(b)
2439 else: # fixed-length, terminate after self.i bytes
2440 self.buffer.append(b)
2441 if len(self.buffer) == self.i:
2442 output += self.process_word()
2443 if final and self.buffer: # EOF terminates the last word
2444 output += self.process_word()
2445 return output
2446
2447 def process_word(self):
2448 output = ''
2449 if self.buffer[0] == ord('i'):
2450 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2451 elif self.buffer[0] == ord('o'):
2452 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2453 else:
2454 output = self.buffer.decode('ascii')
2455 if len(output) < self.o:
2456 output += '-'*self.o # pad out with hyphens
2457 if self.o:
2458 output = output[:self.o] # truncate to output length
2459 output += '.'
2460 self.buffer = bytearray()
2461 return output
2462
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002463 codecEnabled = False
2464
2465 @classmethod
2466 def lookupTestDecoder(cls, name):
2467 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002468 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002469 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002470 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002471 incrementalencoder=None,
2472 streamreader=None, streamwriter=None,
2473 incrementaldecoder=cls)
2474
2475# Register the previous decoder for testing.
2476# Disabled by default, tests will enable it.
2477codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2478
2479
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002480class StatefulIncrementalDecoderTest(unittest.TestCase):
2481 """
2482 Make sure the StatefulIncrementalDecoder actually works.
2483 """
2484
2485 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002486 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002487 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002488 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002489 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002490 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002491 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002492 # I=0, O=6 (variable-length input, fixed-length output)
2493 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2494 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002495 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002496 # I=6, O=3 (fixed-length input > fixed-length output)
2497 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2498 # I=0, then 3; O=29, then 15 (with longer output)
2499 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2500 'a----------------------------.' +
2501 'b----------------------------.' +
2502 'cde--------------------------.' +
2503 'abcdefghijabcde.' +
2504 'a.b------------.' +
2505 '.c.------------.' +
2506 'd.e------------.' +
2507 'k--------------.' +
2508 'l--------------.' +
2509 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002510 ]
2511
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002512 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002513 # Try a few one-shot test cases.
2514 for input, eof, output in self.test_cases:
2515 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002516 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002517
2518 # Also test an unfinished decode, followed by forcing EOF.
2519 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002520 self.assertEqual(d.decode(b'oiabcd'), '')
2521 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002522
2523class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002524
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002525 def setUp(self):
2526 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2527 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002528 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002529
Guido van Rossumd0712812007-04-11 16:32:43 +00002530 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002531 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002532
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002533 def test_constructor(self):
2534 r = self.BytesIO(b"\xc3\xa9\n\n")
2535 b = self.BufferedReader(r, 1000)
2536 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002537 t.__init__(b, encoding="latin-1", newline="\r\n")
2538 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002539 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002540 t.__init__(b, encoding="utf-8", line_buffering=True)
2541 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002542 self.assertEqual(t.line_buffering, True)
2543 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002544 self.assertRaises(TypeError, t.__init__, b, newline=42)
2545 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2546
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002547 def test_uninitialized(self):
2548 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2549 del t
2550 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2551 self.assertRaises(Exception, repr, t)
2552 self.assertRaisesRegex((ValueError, AttributeError),
2553 'uninitialized|has no attribute',
2554 t.read, 0)
2555 t.__init__(self.MockRawIO())
2556 self.assertEqual(t.read(0), '')
2557
Nick Coghlana9b15242014-02-04 22:11:18 +10002558 def test_non_text_encoding_codecs_are_rejected(self):
2559 # Ensure the constructor complains if passed a codec that isn't
2560 # marked as a text encoding
2561 # http://bugs.python.org/issue20404
2562 r = self.BytesIO()
2563 b = self.BufferedWriter(r)
2564 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2565 self.TextIOWrapper(b, encoding="hex")
2566
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002567 def test_detach(self):
2568 r = self.BytesIO()
2569 b = self.BufferedWriter(r)
2570 t = self.TextIOWrapper(b)
2571 self.assertIs(t.detach(), b)
2572
2573 t = self.TextIOWrapper(b, encoding="ascii")
2574 t.write("howdy")
2575 self.assertFalse(r.getvalue())
2576 t.detach()
2577 self.assertEqual(r.getvalue(), b"howdy")
2578 self.assertRaises(ValueError, t.detach)
2579
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002580 # Operations independent of the detached stream should still work
2581 repr(t)
2582 self.assertEqual(t.encoding, "ascii")
2583 self.assertEqual(t.errors, "strict")
2584 self.assertFalse(t.line_buffering)
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002585 self.assertFalse(t.write_through)
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002586
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002587 def test_repr(self):
2588 raw = self.BytesIO("hello".encode("utf-8"))
2589 b = self.BufferedReader(raw)
2590 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002591 modname = self.TextIOWrapper.__module__
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002592 self.assertRegex(repr(t),
2593 r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002594 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002595 self.assertRegex(repr(t),
2596 r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002597 t.mode = "r"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002598 self.assertRegex(repr(t),
2599 r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002600 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002601 self.assertRegex(repr(t),
2602 r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002603
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002604 t.buffer.detach()
2605 repr(t) # Should not raise an exception
2606
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002607 def test_recursive_repr(self):
2608 # Issue #25455
2609 raw = self.BytesIO()
2610 t = self.TextIOWrapper(raw)
2611 with support.swap_attr(raw, 'name', t):
2612 try:
2613 repr(t) # Should not crash
2614 except RuntimeError:
2615 pass
2616
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002617 def test_line_buffering(self):
2618 r = self.BytesIO()
2619 b = self.BufferedWriter(r, 1000)
2620 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002621 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002622 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002623 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002624 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002625 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002626 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002627
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002628 def test_reconfigure_line_buffering(self):
2629 r = self.BytesIO()
2630 b = self.BufferedWriter(r, 1000)
2631 t = self.TextIOWrapper(b, newline="\n", line_buffering=False)
2632 t.write("AB\nC")
2633 self.assertEqual(r.getvalue(), b"")
2634
2635 t.reconfigure(line_buffering=True) # implicit flush
2636 self.assertEqual(r.getvalue(), b"AB\nC")
2637 t.write("DEF\nG")
2638 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2639 t.write("H")
2640 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2641 t.reconfigure(line_buffering=False) # implicit flush
2642 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2643 t.write("IJ")
2644 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2645
2646 # Keeping default value
2647 t.reconfigure()
2648 t.reconfigure(line_buffering=None)
2649 self.assertEqual(t.line_buffering, False)
2650 t.reconfigure(line_buffering=True)
2651 t.reconfigure()
2652 t.reconfigure(line_buffering=None)
2653 self.assertEqual(t.line_buffering, True)
2654
Victor Stinner91106cd2017-12-13 12:29:09 +01002655 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002656 def test_default_encoding(self):
2657 old_environ = dict(os.environ)
2658 try:
2659 # try to get a user preferred encoding different than the current
2660 # locale encoding to check that TextIOWrapper() uses the current
2661 # locale encoding and not the user preferred encoding
2662 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2663 if key in os.environ:
2664 del os.environ[key]
2665
2666 current_locale_encoding = locale.getpreferredencoding(False)
2667 b = self.BytesIO()
2668 t = self.TextIOWrapper(b)
2669 self.assertEqual(t.encoding, current_locale_encoding)
2670 finally:
2671 os.environ.clear()
2672 os.environ.update(old_environ)
2673
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002674 @support.cpython_only
Victor Stinner91106cd2017-12-13 12:29:09 +01002675 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Serhiy Storchaka78980432013-01-15 01:12:17 +02002676 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002677 # Issue 15989
2678 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002679 b = self.BytesIO()
2680 b.fileno = lambda: _testcapi.INT_MAX + 1
2681 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2682 b.fileno = lambda: _testcapi.UINT_MAX + 1
2683 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2684
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002685 def test_encoding(self):
2686 # Check the encoding attribute is always set, and valid
2687 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002688 t = self.TextIOWrapper(b, encoding="utf-8")
2689 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002690 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002691 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002692 codecs.lookup(t.encoding)
2693
2694 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002695 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002696 b = self.BytesIO(b"abc\n\xff\n")
2697 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002698 self.assertRaises(UnicodeError, t.read)
2699 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002700 b = self.BytesIO(b"abc\n\xff\n")
2701 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002702 self.assertRaises(UnicodeError, t.read)
2703 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002704 b = self.BytesIO(b"abc\n\xff\n")
2705 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002706 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002707 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002708 b = self.BytesIO(b"abc\n\xff\n")
2709 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002710 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002711
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002712 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002713 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002714 b = self.BytesIO()
2715 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002716 self.assertRaises(UnicodeError, t.write, "\xff")
2717 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002718 b = self.BytesIO()
2719 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002720 self.assertRaises(UnicodeError, t.write, "\xff")
2721 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002722 b = self.BytesIO()
2723 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002724 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002725 t.write("abc\xffdef\n")
2726 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002727 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002728 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002729 b = self.BytesIO()
2730 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002731 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002732 t.write("abc\xffdef\n")
2733 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002734 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002735
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002736 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002737 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2738
2739 tests = [
2740 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002741 [ '', input_lines ],
2742 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2743 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2744 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002745 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002746 encodings = (
2747 'utf-8', 'latin-1',
2748 'utf-16', 'utf-16-le', 'utf-16-be',
2749 'utf-32', 'utf-32-le', 'utf-32-be',
2750 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002751
Guido van Rossum8358db22007-08-18 21:39:55 +00002752 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002753 # character in TextIOWrapper._pending_line.
2754 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002755 # XXX: str.encode() should return bytes
2756 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002757 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002758 for bufsize in range(1, 10):
2759 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002760 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2761 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002762 encoding=encoding)
2763 if do_reads:
2764 got_lines = []
2765 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002766 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002767 if c2 == '':
2768 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002769 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002770 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002771 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002772 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002773
2774 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002775 self.assertEqual(got_line, exp_line)
2776 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002777
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002778 def test_newlines_input(self):
2779 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002780 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2781 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002782 (None, normalized.decode("ascii").splitlines(keepends=True)),
2783 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002784 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2785 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2786 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002787 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002788 buf = self.BytesIO(testdata)
2789 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002790 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002791 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002792 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002793
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002794 def test_newlines_output(self):
2795 testdict = {
2796 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2797 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2798 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2799 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2800 }
2801 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2802 for newline, expected in tests:
2803 buf = self.BytesIO()
2804 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2805 txt.write("AAA\nB")
2806 txt.write("BB\nCCC\n")
2807 txt.write("X\rY\r\nZ")
2808 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002809 self.assertEqual(buf.closed, False)
2810 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002811
2812 def test_destructor(self):
2813 l = []
2814 base = self.BytesIO
2815 class MyBytesIO(base):
2816 def close(self):
2817 l.append(self.getvalue())
2818 base.close(self)
2819 b = MyBytesIO()
2820 t = self.TextIOWrapper(b, encoding="ascii")
2821 t.write("abc")
2822 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002823 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002824 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002825
2826 def test_override_destructor(self):
2827 record = []
2828 class MyTextIO(self.TextIOWrapper):
2829 def __del__(self):
2830 record.append(1)
2831 try:
2832 f = super().__del__
2833 except AttributeError:
2834 pass
2835 else:
2836 f()
2837 def close(self):
2838 record.append(2)
2839 super().close()
2840 def flush(self):
2841 record.append(3)
2842 super().flush()
2843 b = self.BytesIO()
2844 t = MyTextIO(b, encoding="ascii")
2845 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002846 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002847 self.assertEqual(record, [1, 2, 3])
2848
2849 def test_error_through_destructor(self):
2850 # Test that the exception state is not modified by a destructor,
2851 # even if close() fails.
2852 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02002853 with support.catch_unraisable_exception() as cm:
2854 with self.assertRaises(AttributeError):
2855 self.TextIOWrapper(rawio).xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02002856
2857 if not IOBASE_EMITS_UNRAISABLE:
2858 self.assertIsNone(cm.unraisable)
2859 elif cm.unraisable is not None:
2860 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum8358db22007-08-18 21:39:55 +00002861
Guido van Rossum9b76da62007-04-11 01:09:03 +00002862 # Systematic tests of the text I/O API
2863
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002864 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002865 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002866 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002867 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002868 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002869 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002870 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002871 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002872 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002873 self.assertEqual(f.tell(), 0)
2874 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002875 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002876 self.assertEqual(f.seek(0), 0)
2877 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002878 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002879 self.assertEqual(f.read(2), "ab")
2880 self.assertEqual(f.read(1), "c")
2881 self.assertEqual(f.read(1), "")
2882 self.assertEqual(f.read(), "")
2883 self.assertEqual(f.tell(), cookie)
2884 self.assertEqual(f.seek(0), 0)
2885 self.assertEqual(f.seek(0, 2), cookie)
2886 self.assertEqual(f.write("def"), 3)
2887 self.assertEqual(f.seek(cookie), cookie)
2888 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002889 if enc.startswith("utf"):
2890 self.multi_line_test(f, enc)
2891 f.close()
2892
2893 def multi_line_test(self, f, enc):
2894 f.seek(0)
2895 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002896 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002897 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002898 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002899 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002900 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002901 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002902 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002903 wlines.append((f.tell(), line))
2904 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002905 f.seek(0)
2906 rlines = []
2907 while True:
2908 pos = f.tell()
2909 line = f.readline()
2910 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002911 break
2912 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002913 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002914
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002915 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002916 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002917 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002918 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002919 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002920 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002921 p2 = f.tell()
2922 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002923 self.assertEqual(f.tell(), p0)
2924 self.assertEqual(f.readline(), "\xff\n")
2925 self.assertEqual(f.tell(), p1)
2926 self.assertEqual(f.readline(), "\xff\n")
2927 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002928 f.seek(0)
2929 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002930 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002931 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002932 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002933 f.close()
2934
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002935 def test_seeking(self):
2936 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002937 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002938 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002939 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002940 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002941 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002942 suffix = bytes(u_suffix.encode("utf-8"))
2943 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002944 with self.open(support.TESTFN, "wb") as f:
2945 f.write(line*2)
2946 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2947 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002948 self.assertEqual(s, str(prefix, "ascii"))
2949 self.assertEqual(f.tell(), prefix_size)
2950 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002951
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002952 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002953 # Regression test for a specific bug
2954 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002955 with self.open(support.TESTFN, "wb") as f:
2956 f.write(data)
2957 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2958 f._CHUNK_SIZE # Just test that it exists
2959 f._CHUNK_SIZE = 2
2960 f.readline()
2961 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002962
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002963 def test_seek_and_tell(self):
2964 #Test seek/tell using the StatefulIncrementalDecoder.
2965 # Make test faster by doing smaller seeks
2966 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002967
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002968 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002969 """Tell/seek to various points within a data stream and ensure
2970 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002971 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002972 f.write(data)
2973 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002974 f = self.open(support.TESTFN, encoding='test_decoder')
2975 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002976 decoded = f.read()
2977 f.close()
2978
Neal Norwitze2b07052008-03-18 19:52:05 +00002979 for i in range(min_pos, len(decoded) + 1): # seek positions
2980 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002981 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002982 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002983 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002984 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002985 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002986 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002987 f.close()
2988
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002989 # Enable the test decoder.
2990 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002991
2992 # Run the tests.
2993 try:
2994 # Try each test case.
2995 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002996 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002997
2998 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002999 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
3000 offset = CHUNK_SIZE - len(input)//2
3001 prefix = b'.'*offset
3002 # Don't bother seeking into the prefix (takes too long).
3003 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003004 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003005
3006 # Ensure our test decoder won't interfere with subsequent tests.
3007 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003008 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003009
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003010 def test_multibyte_seek_and_tell(self):
3011 f = self.open(support.TESTFN, "w", encoding="euc_jp")
3012 f.write("AB\n\u3046\u3048\n")
3013 f.close()
3014
3015 f = self.open(support.TESTFN, "r", encoding="euc_jp")
3016 self.assertEqual(f.readline(), "AB\n")
3017 p0 = f.tell()
3018 self.assertEqual(f.readline(), "\u3046\u3048\n")
3019 p1 = f.tell()
3020 f.seek(p0)
3021 self.assertEqual(f.readline(), "\u3046\u3048\n")
3022 self.assertEqual(f.tell(), p1)
3023 f.close()
3024
3025 def test_seek_with_encoder_state(self):
3026 f = self.open(support.TESTFN, "w", encoding="euc_jis_2004")
3027 f.write("\u00e6\u0300")
3028 p0 = f.tell()
3029 f.write("\u00e6")
3030 f.seek(p0)
3031 f.write("\u0300")
3032 f.close()
3033
3034 f = self.open(support.TESTFN, "r", encoding="euc_jis_2004")
3035 self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
3036 f.close()
3037
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003038 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003039 data = "1234567890"
3040 tests = ("utf-16",
3041 "utf-16-le",
3042 "utf-16-be",
3043 "utf-32",
3044 "utf-32-le",
3045 "utf-32-be")
3046 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003047 buf = self.BytesIO()
3048 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003049 # Check if the BOM is written only once (see issue1753).
3050 f.write(data)
3051 f.write(data)
3052 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003053 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00003054 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003055 self.assertEqual(f.read(), data * 2)
3056 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003057
Benjamin Petersona1b49012009-03-31 23:11:32 +00003058 def test_unreadable(self):
3059 class UnReadable(self.BytesIO):
3060 def readable(self):
3061 return False
3062 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003063 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00003064
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003065 def test_read_one_by_one(self):
3066 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003067 reads = ""
3068 while True:
3069 c = txt.read(1)
3070 if not c:
3071 break
3072 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003073 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003074
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00003075 def test_readlines(self):
3076 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
3077 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
3078 txt.seek(0)
3079 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
3080 txt.seek(0)
3081 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
3082
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003083 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003084 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003085 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003086 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003087 reads = ""
3088 while True:
3089 c = txt.read(128)
3090 if not c:
3091 break
3092 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003093 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003094
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02003095 def test_writelines(self):
3096 l = ['ab', 'cd', 'ef']
3097 buf = self.BytesIO()
3098 txt = self.TextIOWrapper(buf)
3099 txt.writelines(l)
3100 txt.flush()
3101 self.assertEqual(buf.getvalue(), b'abcdef')
3102
3103 def test_writelines_userlist(self):
3104 l = UserList(['ab', 'cd', 'ef'])
3105 buf = self.BytesIO()
3106 txt = self.TextIOWrapper(buf)
3107 txt.writelines(l)
3108 txt.flush()
3109 self.assertEqual(buf.getvalue(), b'abcdef')
3110
3111 def test_writelines_error(self):
3112 txt = self.TextIOWrapper(self.BytesIO())
3113 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3114 self.assertRaises(TypeError, txt.writelines, None)
3115 self.assertRaises(TypeError, txt.writelines, b'abc')
3116
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003117 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003118 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003119
3120 # read one char at a time
3121 reads = ""
3122 while True:
3123 c = txt.read(1)
3124 if not c:
3125 break
3126 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003127 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003128
3129 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003130 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003131 txt._CHUNK_SIZE = 4
3132
3133 reads = ""
3134 while True:
3135 c = txt.read(4)
3136 if not c:
3137 break
3138 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003139 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003140
3141 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003142 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003143 txt._CHUNK_SIZE = 4
3144
3145 reads = txt.read(4)
3146 reads += txt.read(4)
3147 reads += txt.readline()
3148 reads += txt.readline()
3149 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003150 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003151
3152 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003153 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003154 txt._CHUNK_SIZE = 4
3155
3156 reads = txt.read(4)
3157 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003158 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003159
3160 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003161 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003162 txt._CHUNK_SIZE = 4
3163
3164 reads = txt.read(4)
3165 pos = txt.tell()
3166 txt.seek(0)
3167 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003168 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003169
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003170 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003171 buffer = self.BytesIO(self.testdata)
3172 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003173
3174 self.assertEqual(buffer.seekable(), txt.seekable())
3175
Antoine Pitroue4501852009-05-14 18:55:55 +00003176 def test_append_bom(self):
3177 # The BOM is not written again when appending to a non-empty file
3178 filename = support.TESTFN
3179 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3180 with self.open(filename, 'w', encoding=charset) as f:
3181 f.write('aaa')
3182 pos = f.tell()
3183 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003184 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003185
3186 with self.open(filename, 'a', encoding=charset) as f:
3187 f.write('xxx')
3188 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003189 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003190
3191 def test_seek_bom(self):
3192 # Same test, but when seeking manually
3193 filename = support.TESTFN
3194 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3195 with self.open(filename, 'w', encoding=charset) as f:
3196 f.write('aaa')
3197 pos = f.tell()
3198 with self.open(filename, 'r+', encoding=charset) as f:
3199 f.seek(pos)
3200 f.write('zzz')
3201 f.seek(0)
3202 f.write('bbb')
3203 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003204 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003205
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003206 def test_seek_append_bom(self):
3207 # Same test, but first seek to the start and then to the end
3208 filename = support.TESTFN
3209 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3210 with self.open(filename, 'w', encoding=charset) as f:
3211 f.write('aaa')
3212 with self.open(filename, 'a', encoding=charset) as f:
3213 f.seek(0)
3214 f.seek(0, self.SEEK_END)
3215 f.write('xxx')
3216 with self.open(filename, 'rb') as f:
3217 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3218
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003219 def test_errors_property(self):
3220 with self.open(support.TESTFN, "w") as f:
3221 self.assertEqual(f.errors, "strict")
3222 with self.open(support.TESTFN, "w", errors="replace") as f:
3223 self.assertEqual(f.errors, "replace")
3224
Brett Cannon31f59292011-02-21 19:29:56 +00003225 @support.no_tracing
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003226 def test_threads_write(self):
3227 # Issue6750: concurrent writes could duplicate data
3228 event = threading.Event()
3229 with self.open(support.TESTFN, "w", buffering=1) as f:
3230 def run(n):
3231 text = "Thread%03d\n" % n
3232 event.wait()
3233 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003234 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003235 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003236 with support.start_threads(threads, event.set):
3237 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003238 with self.open(support.TESTFN) as f:
3239 content = f.read()
3240 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003241 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003242
Antoine Pitrou6be88762010-05-03 16:48:20 +00003243 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003244 # Test that text file is closed despite failed flush
3245 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003246 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003247 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003248 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003249 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003250 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003251 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003252 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003253 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003254 self.assertTrue(txt.buffer.closed)
3255 self.assertTrue(closed) # flush() called
3256 self.assertFalse(closed[0]) # flush() called before file closed
3257 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003258 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003259
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003260 def test_close_error_on_close(self):
3261 buffer = self.BytesIO(self.testdata)
3262 def bad_flush():
3263 raise OSError('flush')
3264 def bad_close():
3265 raise OSError('close')
3266 buffer.close = bad_close
3267 txt = self.TextIOWrapper(buffer, encoding="ascii")
3268 txt.flush = bad_flush
3269 with self.assertRaises(OSError) as err: # exception not swallowed
3270 txt.close()
3271 self.assertEqual(err.exception.args, ('close',))
3272 self.assertIsInstance(err.exception.__context__, OSError)
3273 self.assertEqual(err.exception.__context__.args, ('flush',))
3274 self.assertFalse(txt.closed)
3275
Victor Stinner472f7942019-04-12 21:58:24 +02003276 # Silence destructor error
3277 buffer.close = lambda: None
3278 txt.flush = lambda: None
3279
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003280 def test_nonnormalized_close_error_on_close(self):
3281 # Issue #21677
3282 buffer = self.BytesIO(self.testdata)
3283 def bad_flush():
3284 raise non_existing_flush
3285 def bad_close():
3286 raise non_existing_close
3287 buffer.close = bad_close
3288 txt = self.TextIOWrapper(buffer, encoding="ascii")
3289 txt.flush = bad_flush
3290 with self.assertRaises(NameError) as err: # exception not swallowed
3291 txt.close()
3292 self.assertIn('non_existing_close', str(err.exception))
3293 self.assertIsInstance(err.exception.__context__, NameError)
3294 self.assertIn('non_existing_flush', str(err.exception.__context__))
3295 self.assertFalse(txt.closed)
3296
Victor Stinner472f7942019-04-12 21:58:24 +02003297 # Silence destructor error
3298 buffer.close = lambda: None
3299 txt.flush = lambda: None
3300
Antoine Pitrou6be88762010-05-03 16:48:20 +00003301 def test_multi_close(self):
3302 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3303 txt.close()
3304 txt.close()
3305 txt.close()
3306 self.assertRaises(ValueError, txt.flush)
3307
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003308 def test_unseekable(self):
3309 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3310 self.assertRaises(self.UnsupportedOperation, txt.tell)
3311 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3312
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003313 def test_readonly_attributes(self):
3314 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3315 buf = self.BytesIO(self.testdata)
3316 with self.assertRaises(AttributeError):
3317 txt.buffer = buf
3318
Antoine Pitroue96ec682011-07-23 21:46:35 +02003319 def test_rawio(self):
3320 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3321 # that subprocess.Popen() can have the required unbuffered
3322 # semantics with universal_newlines=True.
3323 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3324 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3325 # Reads
3326 self.assertEqual(txt.read(4), 'abcd')
3327 self.assertEqual(txt.readline(), 'efghi\n')
3328 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3329
3330 def test_rawio_write_through(self):
3331 # Issue #12591: with write_through=True, writes don't need a flush
3332 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3333 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3334 write_through=True)
3335 txt.write('1')
3336 txt.write('23\n4')
3337 txt.write('5')
3338 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3339
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003340 def test_bufio_write_through(self):
3341 # Issue #21396: write_through=True doesn't force a flush()
3342 # on the underlying binary buffered object.
3343 flush_called, write_called = [], []
3344 class BufferedWriter(self.BufferedWriter):
3345 def flush(self, *args, **kwargs):
3346 flush_called.append(True)
3347 return super().flush(*args, **kwargs)
3348 def write(self, *args, **kwargs):
3349 write_called.append(True)
3350 return super().write(*args, **kwargs)
3351
3352 rawio = self.BytesIO()
3353 data = b"a"
3354 bufio = BufferedWriter(rawio, len(data)*2)
3355 textio = self.TextIOWrapper(bufio, encoding='ascii',
3356 write_through=True)
3357 # write to the buffered io but don't overflow the buffer
3358 text = data.decode('ascii')
3359 textio.write(text)
3360
3361 # buffer.flush is not called with write_through=True
3362 self.assertFalse(flush_called)
3363 # buffer.write *is* called with write_through=True
3364 self.assertTrue(write_called)
3365 self.assertEqual(rawio.getvalue(), b"") # no flush
3366
3367 write_called = [] # reset
3368 textio.write(text * 10) # total content is larger than bufio buffer
3369 self.assertTrue(write_called)
3370 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3371
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02003372 def test_reconfigure_write_through(self):
3373 raw = self.MockRawIO([])
3374 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3375 t.write('1')
3376 t.reconfigure(write_through=True) # implied flush
3377 self.assertEqual(t.write_through, True)
3378 self.assertEqual(b''.join(raw._write_stack), b'1')
3379 t.write('23')
3380 self.assertEqual(b''.join(raw._write_stack), b'123')
3381 t.reconfigure(write_through=False)
3382 self.assertEqual(t.write_through, False)
3383 t.write('45')
3384 t.flush()
3385 self.assertEqual(b''.join(raw._write_stack), b'12345')
3386 # Keeping default value
3387 t.reconfigure()
3388 t.reconfigure(write_through=None)
3389 self.assertEqual(t.write_through, False)
3390 t.reconfigure(write_through=True)
3391 t.reconfigure()
3392 t.reconfigure(write_through=None)
3393 self.assertEqual(t.write_through, True)
3394
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003395 def test_read_nonbytes(self):
3396 # Issue #17106
3397 # Crash when underlying read() returns non-bytes
3398 t = self.TextIOWrapper(self.StringIO('a'))
3399 self.assertRaises(TypeError, t.read, 1)
3400 t = self.TextIOWrapper(self.StringIO('a'))
3401 self.assertRaises(TypeError, t.readline)
3402 t = self.TextIOWrapper(self.StringIO('a'))
3403 self.assertRaises(TypeError, t.read)
3404
Oren Milmana5b4ea12017-08-25 21:14:54 +03003405 def test_illegal_encoder(self):
3406 # Issue 31271: Calling write() while the return value of encoder's
3407 # encode() is invalid shouldn't cause an assertion failure.
3408 rot13 = codecs.lookup("rot13")
3409 with support.swap_attr(rot13, '_is_text_encoding', True):
3410 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3411 self.assertRaises(TypeError, t.write, 'bar')
3412
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003413 def test_illegal_decoder(self):
3414 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003415 # Bypass the early encoding check added in issue 20404
3416 def _make_illegal_wrapper():
3417 quopri = codecs.lookup("quopri")
3418 quopri._is_text_encoding = True
3419 try:
3420 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3421 newline='\n', encoding="quopri")
3422 finally:
3423 quopri._is_text_encoding = False
3424 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003425 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003426 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003427 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003428 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003429 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003430 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003431 self.assertRaises(TypeError, t.read)
3432
Oren Milmanba7d7362017-08-29 11:58:27 +03003433 # Issue 31243: calling read() while the return value of decoder's
3434 # getstate() is invalid should neither crash the interpreter nor
3435 # raise a SystemError.
3436 def _make_very_illegal_wrapper(getstate_ret_val):
3437 class BadDecoder:
3438 def getstate(self):
3439 return getstate_ret_val
3440 def _get_bad_decoder(dummy):
3441 return BadDecoder()
3442 quopri = codecs.lookup("quopri")
3443 with support.swap_attr(quopri, 'incrementaldecoder',
3444 _get_bad_decoder):
3445 return _make_illegal_wrapper()
3446 t = _make_very_illegal_wrapper(42)
3447 self.assertRaises(TypeError, t.read, 42)
3448 t = _make_very_illegal_wrapper(())
3449 self.assertRaises(TypeError, t.read, 42)
3450 t = _make_very_illegal_wrapper((1, 2))
3451 self.assertRaises(TypeError, t.read, 42)
3452
Antoine Pitrou712cb732013-12-21 15:51:54 +01003453 def _check_create_at_shutdown(self, **kwargs):
3454 # Issue #20037: creating a TextIOWrapper at shutdown
3455 # shouldn't crash the interpreter.
3456 iomod = self.io.__name__
3457 code = """if 1:
3458 import codecs
3459 import {iomod} as io
3460
3461 # Avoid looking up codecs at shutdown
3462 codecs.lookup('utf-8')
3463
3464 class C:
3465 def __init__(self):
3466 self.buf = io.BytesIO()
3467 def __del__(self):
3468 io.TextIOWrapper(self.buf, **{kwargs})
3469 print("ok")
3470 c = C()
3471 """.format(iomod=iomod, kwargs=kwargs)
3472 return assert_python_ok("-c", code)
3473
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003474 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003475 def test_create_at_shutdown_without_encoding(self):
3476 rc, out, err = self._check_create_at_shutdown()
3477 if err:
3478 # Can error out with a RuntimeError if the module state
3479 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003480 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003481 else:
3482 self.assertEqual("ok", out.decode().strip())
3483
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003484 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003485 def test_create_at_shutdown_with_encoding(self):
3486 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3487 errors='strict')
3488 self.assertFalse(err)
3489 self.assertEqual("ok", out.decode().strip())
3490
Antoine Pitroub8503892014-04-29 10:14:02 +02003491 def test_read_byteslike(self):
3492 r = MemviewBytesIO(b'Just some random string\n')
3493 t = self.TextIOWrapper(r, 'utf-8')
3494
3495 # TextIOwrapper will not read the full string, because
3496 # we truncate it to a multiple of the native int size
3497 # so that we can construct a more complex memoryview.
3498 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3499
3500 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3501
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003502 def test_issue22849(self):
3503 class F(object):
3504 def readable(self): return True
3505 def writable(self): return True
3506 def seekable(self): return True
3507
3508 for i in range(10):
3509 try:
3510 self.TextIOWrapper(F(), encoding='utf-8')
3511 except Exception:
3512 pass
3513
3514 F.tell = lambda x: 0
3515 t = self.TextIOWrapper(F(), encoding='utf-8')
3516
INADA Naoki507434f2017-12-21 09:59:53 +09003517 def test_reconfigure_encoding_read(self):
3518 # latin1 -> utf8
3519 # (latin1 can decode utf-8 encoded string)
3520 data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
3521 raw = self.BytesIO(data)
3522 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3523 self.assertEqual(txt.readline(), 'abc\xe9\n')
3524 with self.assertRaises(self.UnsupportedOperation):
3525 txt.reconfigure(encoding='utf-8')
3526 with self.assertRaises(self.UnsupportedOperation):
3527 txt.reconfigure(newline=None)
3528
3529 def test_reconfigure_write_fromascii(self):
3530 # ascii has a specific encodefunc in the C implementation,
3531 # but utf-8-sig has not. Make sure that we get rid of the
3532 # cached encodefunc when we switch encoders.
3533 raw = self.BytesIO()
3534 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3535 txt.write('foo\n')
3536 txt.reconfigure(encoding='utf-8-sig')
3537 txt.write('\xe9\n')
3538 txt.flush()
3539 self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
3540
3541 def test_reconfigure_write(self):
3542 # latin -> utf8
3543 raw = self.BytesIO()
3544 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3545 txt.write('abc\xe9\n')
3546 txt.reconfigure(encoding='utf-8')
3547 self.assertEqual(raw.getvalue(), b'abc\xe9\n')
3548 txt.write('d\xe9f\n')
3549 txt.flush()
3550 self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
3551
3552 # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
3553 # the file
3554 raw = self.BytesIO()
3555 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3556 txt.write('abc\n')
3557 txt.reconfigure(encoding='utf-8-sig')
3558 txt.write('d\xe9f\n')
3559 txt.flush()
3560 self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
3561
3562 def test_reconfigure_write_non_seekable(self):
3563 raw = self.BytesIO()
3564 raw.seekable = lambda: False
3565 raw.seek = None
3566 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3567 txt.write('abc\n')
3568 txt.reconfigure(encoding='utf-8-sig')
3569 txt.write('d\xe9f\n')
3570 txt.flush()
3571
3572 # If the raw stream is not seekable, there'll be a BOM
3573 self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
3574
3575 def test_reconfigure_defaults(self):
3576 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
3577 txt.reconfigure(encoding=None)
3578 self.assertEqual(txt.encoding, 'ascii')
3579 self.assertEqual(txt.errors, 'replace')
3580 txt.write('LF\n')
3581
3582 txt.reconfigure(newline='\r\n')
3583 self.assertEqual(txt.encoding, 'ascii')
3584 self.assertEqual(txt.errors, 'replace')
3585
3586 txt.reconfigure(errors='ignore')
3587 self.assertEqual(txt.encoding, 'ascii')
3588 self.assertEqual(txt.errors, 'ignore')
3589 txt.write('CRLF\n')
3590
3591 txt.reconfigure(encoding='utf-8', newline=None)
3592 self.assertEqual(txt.errors, 'strict')
3593 txt.seek(0)
3594 self.assertEqual(txt.read(), 'LF\nCRLF\n')
3595
3596 self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
3597
3598 def test_reconfigure_newline(self):
3599 raw = self.BytesIO(b'CR\rEOF')
3600 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3601 txt.reconfigure(newline=None)
3602 self.assertEqual(txt.readline(), 'CR\n')
3603 raw = self.BytesIO(b'CR\rEOF')
3604 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3605 txt.reconfigure(newline='')
3606 self.assertEqual(txt.readline(), 'CR\r')
3607 raw = self.BytesIO(b'CR\rLF\nEOF')
3608 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3609 txt.reconfigure(newline='\n')
3610 self.assertEqual(txt.readline(), 'CR\rLF\n')
3611 raw = self.BytesIO(b'LF\nCR\rEOF')
3612 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3613 txt.reconfigure(newline='\r')
3614 self.assertEqual(txt.readline(), 'LF\nCR\r')
3615 raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
3616 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3617 txt.reconfigure(newline='\r\n')
3618 self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
3619
3620 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
3621 txt.reconfigure(newline=None)
3622 txt.write('linesep\n')
3623 txt.reconfigure(newline='')
3624 txt.write('LF\n')
3625 txt.reconfigure(newline='\n')
3626 txt.write('LF\n')
3627 txt.reconfigure(newline='\r')
3628 txt.write('CR\n')
3629 txt.reconfigure(newline='\r\n')
3630 txt.write('CRLF\n')
3631 expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
3632 self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
3633
Zackery Spytz23db9352018-06-29 04:14:58 -06003634 def test_issue25862(self):
3635 # Assertion failures occurred in tell() after read() and write().
3636 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3637 t.read(1)
3638 t.read()
3639 t.tell()
3640 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3641 t.read(1)
3642 t.write('x')
3643 t.tell()
3644
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003645
Antoine Pitroub8503892014-04-29 10:14:02 +02003646class MemviewBytesIO(io.BytesIO):
3647 '''A BytesIO object whose read method returns memoryviews
3648 rather than bytes'''
3649
3650 def read1(self, len_):
3651 return _to_memoryview(super().read1(len_))
3652
3653 def read(self, len_):
3654 return _to_memoryview(super().read(len_))
3655
3656def _to_memoryview(buf):
3657 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3658
3659 arr = array.array('i')
3660 idx = len(buf) - len(buf) % arr.itemsize
3661 arr.frombytes(buf[:idx])
3662 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003663
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003664
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003665class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003666 io = io
Steve Dowerfe8f4c92016-09-07 09:31:52 -07003667 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003668
3669 def test_initialization(self):
3670 r = self.BytesIO(b"\xc3\xa9\n\n")
3671 b = self.BufferedReader(r, 1000)
3672 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003673 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3674 self.assertRaises(ValueError, t.read)
3675
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003676 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3677 self.assertRaises(Exception, repr, t)
3678
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003679 def test_garbage_collection(self):
3680 # C TextIOWrapper objects are collected, and collecting them flushes
3681 # all data to disk.
3682 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003683 with support.check_warnings(('', ResourceWarning)):
3684 rawio = io.FileIO(support.TESTFN, "wb")
3685 b = self.BufferedWriter(rawio)
3686 t = self.TextIOWrapper(b, encoding="ascii")
3687 t.write("456def")
3688 t.x = t
3689 wr = weakref.ref(t)
3690 del t
3691 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003692 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003693 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003694 self.assertEqual(f.read(), b"456def")
3695
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003696 def test_rwpair_cleared_before_textio(self):
3697 # Issue 13070: TextIOWrapper's finalization would crash when called
3698 # after the reference to the underlying BufferedRWPair's writer got
3699 # cleared by the GC.
3700 for i in range(1000):
3701 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3702 t1 = self.TextIOWrapper(b1, encoding="ascii")
3703 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3704 t2 = self.TextIOWrapper(b2, encoding="ascii")
3705 # circular references
3706 t1.buddy = t2
3707 t2.buddy = t1
3708 support.gc_collect()
3709
Zackery Spytz842acaa2018-12-17 07:52:45 -07003710 def test_del__CHUNK_SIZE_SystemError(self):
3711 t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
3712 with self.assertRaises(AttributeError):
3713 del t._CHUNK_SIZE
3714
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003715
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003716class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003717 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003718 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003719
3720
3721class IncrementalNewlineDecoderTest(unittest.TestCase):
3722
3723 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003724 # UTF-8 specific tests for a newline decoder
3725 def _check_decode(b, s, **kwargs):
3726 # We exercise getstate() / setstate() as well as decode()
3727 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003728 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003729 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003730 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003731
Antoine Pitrou180a3362008-12-14 16:36:46 +00003732 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003733
Antoine Pitrou180a3362008-12-14 16:36:46 +00003734 _check_decode(b'\xe8', "")
3735 _check_decode(b'\xa2', "")
3736 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003737
Antoine Pitrou180a3362008-12-14 16:36:46 +00003738 _check_decode(b'\xe8', "")
3739 _check_decode(b'\xa2', "")
3740 _check_decode(b'\x88', "\u8888")
3741
3742 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003743 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3744
Antoine Pitrou180a3362008-12-14 16:36:46 +00003745 decoder.reset()
3746 _check_decode(b'\n', "\n")
3747 _check_decode(b'\r', "")
3748 _check_decode(b'', "\n", final=True)
3749 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003750
Antoine Pitrou180a3362008-12-14 16:36:46 +00003751 _check_decode(b'\r', "")
3752 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003753
Antoine Pitrou180a3362008-12-14 16:36:46 +00003754 _check_decode(b'\r\r\n', "\n\n")
3755 _check_decode(b'\r', "")
3756 _check_decode(b'\r', "\n")
3757 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003758
Antoine Pitrou180a3362008-12-14 16:36:46 +00003759 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3760 _check_decode(b'\xe8\xa2\x88', "\u8888")
3761 _check_decode(b'\n', "\n")
3762 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3763 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003764
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003765 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003766 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003767 if encoding is not None:
3768 encoder = codecs.getincrementalencoder(encoding)()
3769 def _decode_bytewise(s):
3770 # Decode one byte at a time
3771 for b in encoder.encode(s):
3772 result.append(decoder.decode(bytes([b])))
3773 else:
3774 encoder = None
3775 def _decode_bytewise(s):
3776 # Decode one char at a time
3777 for c in s:
3778 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003779 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003780 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003781 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003782 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003783 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003784 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003785 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003786 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003787 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003788 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003789 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003790 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003791 input = "abc"
3792 if encoder is not None:
3793 encoder.reset()
3794 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003795 self.assertEqual(decoder.decode(input), "abc")
3796 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003797
3798 def test_newline_decoder(self):
3799 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003800 # None meaning the IncrementalNewlineDecoder takes unicode input
3801 # rather than bytes input
3802 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003803 'utf-16', 'utf-16-le', 'utf-16-be',
3804 'utf-32', 'utf-32-le', 'utf-32-be',
3805 )
3806 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003807 decoder = enc and codecs.getincrementaldecoder(enc)()
3808 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3809 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003810 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003811 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3812 self.check_newline_decoding_utf8(decoder)
Oren Milman1d1d3e92017-08-20 18:35:36 +03003813 self.assertRaises(TypeError, decoder.setstate, 42)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003814
Antoine Pitrou66913e22009-03-06 23:40:56 +00003815 def test_newline_bytes(self):
3816 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3817 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003818 self.assertEqual(dec.newlines, None)
3819 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3820 self.assertEqual(dec.newlines, None)
3821 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3822 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003823 dec = self.IncrementalNewlineDecoder(None, translate=False)
3824 _check(dec)
3825 dec = self.IncrementalNewlineDecoder(None, translate=True)
3826 _check(dec)
3827
Xiang Zhangb08746b2018-10-31 19:49:16 +08003828 def test_translate(self):
3829 # issue 35062
3830 for translate in (-2, -1, 1, 2):
3831 decoder = codecs.getincrementaldecoder("utf-8")()
3832 decoder = self.IncrementalNewlineDecoder(decoder, translate)
3833 self.check_newline_decoding_utf8(decoder)
3834 decoder = codecs.getincrementaldecoder("utf-8")()
3835 decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
3836 self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
3837
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003838class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3839 pass
3840
3841class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3842 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003843
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003844
Guido van Rossum01a27522007-03-07 01:00:12 +00003845# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003846
Guido van Rossum5abbf752007-08-27 17:39:33 +00003847class MiscIOTest(unittest.TestCase):
3848
Barry Warsaw40e82462008-11-20 20:14:50 +00003849 def tearDown(self):
3850 support.unlink(support.TESTFN)
3851
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003852 def test___all__(self):
3853 for name in self.io.__all__:
3854 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003855 self.assertIsNotNone(obj, name)
Steve Dowerb82e17e2019-05-23 08:45:22 -07003856 if name in ("open", "open_code"):
Guido van Rossum5abbf752007-08-27 17:39:33 +00003857 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003858 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003859 self.assertTrue(issubclass(obj, Exception), name)
3860 elif not name.startswith("SEEK_"):
3861 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003862
Barry Warsaw40e82462008-11-20 20:14:50 +00003863 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003864 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003865 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003866 f.close()
3867
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003868 with support.check_warnings(('', DeprecationWarning)):
3869 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003870 self.assertEqual(f.name, support.TESTFN)
3871 self.assertEqual(f.buffer.name, support.TESTFN)
3872 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3873 self.assertEqual(f.mode, "U")
3874 self.assertEqual(f.buffer.mode, "rb")
3875 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003876 f.close()
3877
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003878 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003879 self.assertEqual(f.mode, "w+")
3880 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3881 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003882
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003883 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003884 self.assertEqual(g.mode, "wb")
3885 self.assertEqual(g.raw.mode, "wb")
3886 self.assertEqual(g.name, f.fileno())
3887 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003888 f.close()
3889 g.close()
3890
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003891 def test_io_after_close(self):
3892 for kwargs in [
3893 {"mode": "w"},
3894 {"mode": "wb"},
3895 {"mode": "w", "buffering": 1},
3896 {"mode": "w", "buffering": 2},
3897 {"mode": "wb", "buffering": 0},
3898 {"mode": "r"},
3899 {"mode": "rb"},
3900 {"mode": "r", "buffering": 1},
3901 {"mode": "r", "buffering": 2},
3902 {"mode": "rb", "buffering": 0},
3903 {"mode": "w+"},
3904 {"mode": "w+b"},
3905 {"mode": "w+", "buffering": 1},
3906 {"mode": "w+", "buffering": 2},
3907 {"mode": "w+b", "buffering": 0},
3908 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003909 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003910 f.close()
3911 self.assertRaises(ValueError, f.flush)
3912 self.assertRaises(ValueError, f.fileno)
3913 self.assertRaises(ValueError, f.isatty)
3914 self.assertRaises(ValueError, f.__iter__)
3915 if hasattr(f, "peek"):
3916 self.assertRaises(ValueError, f.peek, 1)
3917 self.assertRaises(ValueError, f.read)
3918 if hasattr(f, "read1"):
3919 self.assertRaises(ValueError, f.read1, 1024)
Martin Panterccb2c0e2016-10-20 23:48:14 +00003920 self.assertRaises(ValueError, f.read1)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003921 if hasattr(f, "readall"):
3922 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003923 if hasattr(f, "readinto"):
3924 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003925 if hasattr(f, "readinto1"):
3926 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003927 self.assertRaises(ValueError, f.readline)
3928 self.assertRaises(ValueError, f.readlines)
Xiang Zhang026435c2017-04-15 12:47:28 +08003929 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003930 self.assertRaises(ValueError, f.seek, 0)
3931 self.assertRaises(ValueError, f.tell)
3932 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003933 self.assertRaises(ValueError, f.write,
3934 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003935 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003936 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003937
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003938 def test_blockingioerror(self):
3939 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003940 class C(str):
3941 pass
3942 c = C("")
3943 b = self.BlockingIOError(1, c)
3944 c.b = b
3945 b.c = c
3946 wr = weakref.ref(c)
3947 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003948 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003949 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003950
3951 def test_abcs(self):
3952 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003953 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3954 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3955 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3956 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003957
3958 def _check_abc_inheritance(self, abcmodule):
3959 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003960 self.assertIsInstance(f, abcmodule.IOBase)
3961 self.assertIsInstance(f, abcmodule.RawIOBase)
3962 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3963 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003964 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003965 self.assertIsInstance(f, abcmodule.IOBase)
3966 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3967 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3968 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003969 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003970 self.assertIsInstance(f, abcmodule.IOBase)
3971 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3972 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3973 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003974
3975 def test_abc_inheritance(self):
3976 # Test implementations inherit from their respective ABCs
3977 self._check_abc_inheritance(self)
3978
3979 def test_abc_inheritance_official(self):
3980 # Test implementations inherit from the official ABCs of the
3981 # baseline "io" module.
3982 self._check_abc_inheritance(io)
3983
Antoine Pitroue033e062010-10-29 10:38:18 +00003984 def _check_warn_on_dealloc(self, *args, **kwargs):
3985 f = open(*args, **kwargs)
3986 r = repr(f)
3987 with self.assertWarns(ResourceWarning) as cm:
3988 f = None
3989 support.gc_collect()
3990 self.assertIn(r, str(cm.warning.args[0]))
3991
3992 def test_warn_on_dealloc(self):
3993 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3994 self._check_warn_on_dealloc(support.TESTFN, "wb")
3995 self._check_warn_on_dealloc(support.TESTFN, "w")
3996
3997 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3998 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003999 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00004000 for fd in fds:
4001 try:
4002 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02004003 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00004004 if e.errno != errno.EBADF:
4005 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00004006 self.addCleanup(cleanup_fds)
4007 r, w = os.pipe()
4008 fds += r, w
4009 self._check_warn_on_dealloc(r, *args, **kwargs)
4010 # When using closefd=False, there's no warning
4011 r, w = os.pipe()
4012 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02004013 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00004014 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00004015
4016 def test_warn_on_dealloc_fd(self):
4017 self._check_warn_on_dealloc_fd("rb", buffering=0)
4018 self._check_warn_on_dealloc_fd("rb")
4019 self._check_warn_on_dealloc_fd("r")
4020
4021
Antoine Pitrou243757e2010-11-05 21:15:39 +00004022 def test_pickling(self):
4023 # Pickling file objects is forbidden
4024 for kwargs in [
4025 {"mode": "w"},
4026 {"mode": "wb"},
4027 {"mode": "wb", "buffering": 0},
4028 {"mode": "r"},
4029 {"mode": "rb"},
4030 {"mode": "rb", "buffering": 0},
4031 {"mode": "w+"},
4032 {"mode": "w+b"},
4033 {"mode": "w+b", "buffering": 0},
4034 ]:
4035 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
4036 with self.open(support.TESTFN, **kwargs) as f:
4037 self.assertRaises(TypeError, pickle.dumps, f, protocol)
4038
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004039 def test_nonblock_pipe_write_bigbuf(self):
4040 self._test_nonblock_pipe_write(16*1024)
4041
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004042 def test_nonblock_pipe_write_smallbuf(self):
4043 self._test_nonblock_pipe_write(1024)
4044
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004045 @unittest.skipUnless(hasattr(os, 'set_blocking'),
4046 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004047 def _test_nonblock_pipe_write(self, bufsize):
4048 sent = []
4049 received = []
4050 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004051 os.set_blocking(r, False)
4052 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004053
4054 # To exercise all code paths in the C implementation we need
4055 # to play with buffer sizes. For instance, if we choose a
4056 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
4057 # then we will never get a partial write of the buffer.
4058 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
4059 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
4060
4061 with rf, wf:
4062 for N in 9999, 73, 7574:
4063 try:
4064 i = 0
4065 while True:
4066 msg = bytes([i % 26 + 97]) * N
4067 sent.append(msg)
4068 wf.write(msg)
4069 i += 1
4070
4071 except self.BlockingIOError as e:
4072 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004073 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004074 sent[-1] = sent[-1][:e.characters_written]
4075 received.append(rf.read())
4076 msg = b'BLOCKED'
4077 wf.write(msg)
4078 sent.append(msg)
4079
4080 while True:
4081 try:
4082 wf.flush()
4083 break
4084 except self.BlockingIOError as e:
4085 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004086 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004087 self.assertEqual(e.characters_written, 0)
4088 received.append(rf.read())
4089
4090 received += iter(rf.read, None)
4091
4092 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03004093 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004094 self.assertTrue(wf.closed)
4095 self.assertTrue(rf.closed)
4096
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004097 def test_create_fail(self):
4098 # 'x' mode fails if file is existing
4099 with self.open(support.TESTFN, 'w'):
4100 pass
4101 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
4102
4103 def test_create_writes(self):
4104 # 'x' mode opens for writing
4105 with self.open(support.TESTFN, 'xb') as f:
4106 f.write(b"spam")
4107 with self.open(support.TESTFN, 'rb') as f:
4108 self.assertEqual(b"spam", f.read())
4109
Christian Heimes7b648752012-09-10 14:48:43 +02004110 def test_open_allargs(self):
4111 # there used to be a buffer overflow in the parser for rawmode
4112 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
4113
4114
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004115class CMiscIOTest(MiscIOTest):
4116 io = io
4117
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03004118 def test_readinto_buffer_overflow(self):
4119 # Issue #18025
4120 class BadReader(self.io.BufferedIOBase):
4121 def read(self, n=-1):
4122 return b'x' * 10**6
4123 bufio = BadReader()
4124 b = bytearray(2)
4125 self.assertRaises(ValueError, bufio.readinto, b)
4126
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004127 def check_daemon_threads_shutdown_deadlock(self, stream_name):
4128 # Issue #23309: deadlocks at shutdown should be avoided when a
4129 # daemon thread and the main thread both write to a file.
4130 code = """if 1:
4131 import sys
4132 import time
4133 import threading
Victor Stinner2a1aed02017-04-21 17:59:23 +02004134 from test.support import SuppressCrashReport
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004135
4136 file = sys.{stream_name}
4137
4138 def run():
4139 while True:
4140 file.write('.')
4141 file.flush()
4142
Victor Stinner2a1aed02017-04-21 17:59:23 +02004143 crash = SuppressCrashReport()
4144 crash.__enter__()
4145 # don't call __exit__(): the crash occurs at Python shutdown
4146
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004147 thread = threading.Thread(target=run)
4148 thread.daemon = True
4149 thread.start()
4150
4151 time.sleep(0.5)
4152 file.write('!')
4153 file.flush()
4154 """.format_map(locals())
4155 res, _ = run_python_until_end("-c", code)
4156 err = res.err.decode()
4157 if res.rc != 0:
4158 # Failure: should be a fatal error
Max Bernsteinccb7ca72019-05-21 10:09:21 -07004159 pattern = (r"Fatal Python error: could not acquire lock "
4160 r"for <(_io\.)?BufferedWriter name='<{stream_name}>'> "
4161 r"at interpreter shutdown, possibly due to "
4162 r"daemon threads".format_map(locals()))
4163 self.assertRegex(err, pattern)
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004164 else:
4165 self.assertFalse(err.strip('.!'))
4166
4167 def test_daemon_threads_shutdown_stdout_deadlock(self):
4168 self.check_daemon_threads_shutdown_deadlock('stdout')
4169
4170 def test_daemon_threads_shutdown_stderr_deadlock(self):
4171 self.check_daemon_threads_shutdown_deadlock('stderr')
4172
4173
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004174class PyMiscIOTest(MiscIOTest):
4175 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00004176
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004177
4178@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
4179class SignalsTest(unittest.TestCase):
4180
4181 def setUp(self):
4182 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
4183
4184 def tearDown(self):
4185 signal.signal(signal.SIGALRM, self.oldalrm)
4186
4187 def alarm_interrupt(self, sig, frame):
4188 1/0
4189
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004190 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
4191 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00004192 invokes the signal handler, and bubbles up the exception raised
4193 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004194 read_results = []
4195 def _read():
4196 s = os.read(r, 1)
4197 read_results.append(s)
Victor Stinner05c9d312018-12-18 23:52:39 +01004198
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004199 t = threading.Thread(target=_read)
4200 t.daemon = True
4201 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00004202 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01004203 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004204 try:
4205 wio = self.io.open(w, **fdopen_kwargs)
Victor Stinner05c9d312018-12-18 23:52:39 +01004206 if hasattr(signal, 'pthread_sigmask'):
4207 # create the thread with SIGALRM signal blocked
4208 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
4209 t.start()
4210 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM])
4211 else:
4212 t.start()
4213
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004214 # Fill the pipe enough that the write will be blocking.
4215 # It will be interrupted by the timer armed above. Since the
4216 # other thread has read one byte, the low-level write will
4217 # return with a successful (partial) result rather than an EINTR.
4218 # The buffered IO layer must check for pending signal
4219 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02004220 signal.alarm(1)
4221 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01004222 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02004223 finally:
4224 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03004225 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004226 # We got one byte, get another one and check that it isn't a
4227 # repeat of the first one.
4228 read_results.append(os.read(r, 1))
4229 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
4230 finally:
4231 os.close(w)
4232 os.close(r)
4233 # This is deliberate. If we didn't close the file descriptor
4234 # before closing wio, wio would try to flush its internal
4235 # buffer, and block again.
4236 try:
4237 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004238 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004239 if e.errno != errno.EBADF:
4240 raise
4241
4242 def test_interrupted_write_unbuffered(self):
4243 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
4244
4245 def test_interrupted_write_buffered(self):
4246 self.check_interrupted_write(b"xy", b"xy", mode="wb")
4247
4248 def test_interrupted_write_text(self):
4249 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
4250
Brett Cannon31f59292011-02-21 19:29:56 +00004251 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004252 def check_reentrant_write(self, data, **fdopen_kwargs):
4253 def on_alarm(*args):
4254 # Will be called reentrantly from the same thread
4255 wio.write(data)
4256 1/0
4257 signal.signal(signal.SIGALRM, on_alarm)
4258 r, w = os.pipe()
4259 wio = self.io.open(w, **fdopen_kwargs)
4260 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004261 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004262 # Either the reentrant call to wio.write() fails with RuntimeError,
4263 # or the signal handler raises ZeroDivisionError.
4264 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4265 while 1:
4266 for i in range(100):
4267 wio.write(data)
4268 wio.flush()
4269 # Make sure the buffer doesn't fill up and block further writes
4270 os.read(r, len(data) * 100)
4271 exc = cm.exception
4272 if isinstance(exc, RuntimeError):
4273 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4274 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004275 signal.alarm(0)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004276 wio.close()
4277 os.close(r)
4278
4279 def test_reentrant_write_buffered(self):
4280 self.check_reentrant_write(b"xy", mode="wb")
4281
4282 def test_reentrant_write_text(self):
4283 self.check_reentrant_write("xy", mode="w", encoding="ascii")
4284
Antoine Pitrou707ce822011-02-25 21:24:11 +00004285 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4286 """Check that a buffered read, when it gets interrupted (either
4287 returning a partial result or EINTR), properly invokes the signal
4288 handler and retries if the latter returned successfully."""
4289 r, w = os.pipe()
4290 fdopen_kwargs["closefd"] = False
4291 def alarm_handler(sig, frame):
4292 os.write(w, b"bar")
4293 signal.signal(signal.SIGALRM, alarm_handler)
4294 try:
4295 rio = self.io.open(r, **fdopen_kwargs)
4296 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004297 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004298 # Expected behaviour:
4299 # - first raw read() returns partial b"foo"
4300 # - second raw read() returns EINTR
4301 # - third raw read() returns b"bar"
4302 self.assertEqual(decode(rio.read(6)), "foobar")
4303 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004304 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004305 rio.close()
4306 os.close(w)
4307 os.close(r)
4308
Antoine Pitrou20db5112011-08-19 20:32:34 +02004309 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004310 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4311 mode="rb")
4312
Antoine Pitrou20db5112011-08-19 20:32:34 +02004313 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004314 self.check_interrupted_read_retry(lambda x: x,
4315 mode="r")
4316
Antoine Pitrou707ce822011-02-25 21:24:11 +00004317 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4318 """Check that a buffered write, when it gets interrupted (either
4319 returning a partial result or EINTR), properly invokes the signal
4320 handler and retries if the latter returned successfully."""
4321 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004322
Antoine Pitrou707ce822011-02-25 21:24:11 +00004323 # A quantity that exceeds the buffer size of an anonymous pipe's
4324 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02004325 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00004326 r, w = os.pipe()
4327 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004328
Antoine Pitrou707ce822011-02-25 21:24:11 +00004329 # We need a separate thread to read from the pipe and allow the
4330 # write() to finish. This thread is started after the SIGALRM is
4331 # received (forcing a first EINTR in write()).
4332 read_results = []
4333 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004334 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00004335 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004336 try:
4337 while not write_finished:
4338 while r in select.select([r], [], [], 1.0)[0]:
4339 s = os.read(r, 1024)
4340 read_results.append(s)
4341 except BaseException as exc:
4342 nonlocal error
4343 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00004344 t = threading.Thread(target=_read)
4345 t.daemon = True
4346 def alarm1(sig, frame):
4347 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004348 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004349 def alarm2(sig, frame):
4350 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004351
4352 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00004353 signal.signal(signal.SIGALRM, alarm1)
4354 try:
4355 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004356 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004357 # Expected behaviour:
4358 # - first raw write() is partial (because of the limited pipe buffer
4359 # and the first alarm)
4360 # - second raw write() returns EINTR (because of the second alarm)
4361 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004362 written = wio.write(large_data)
4363 self.assertEqual(N, written)
4364
Antoine Pitrou707ce822011-02-25 21:24:11 +00004365 wio.flush()
4366 write_finished = True
4367 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004368
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004369 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004370 self.assertEqual(N, sum(len(x) for x in read_results))
4371 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004372 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004373 write_finished = True
4374 os.close(w)
4375 os.close(r)
4376 # This is deliberate. If we didn't close the file descriptor
4377 # before closing wio, wio would try to flush its internal
4378 # buffer, and could block (in case of failure).
4379 try:
4380 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004381 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00004382 if e.errno != errno.EBADF:
4383 raise
4384
Antoine Pitrou20db5112011-08-19 20:32:34 +02004385 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004386 self.check_interrupted_write_retry(b"x", mode="wb")
4387
Antoine Pitrou20db5112011-08-19 20:32:34 +02004388 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004389 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4390
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004391
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004392class CSignalsTest(SignalsTest):
4393 io = io
4394
4395class PySignalsTest(SignalsTest):
4396 io = pyio
4397
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004398 # Handling reentrancy issues would slow down _pyio even more, so the
4399 # tests are disabled.
4400 test_reentrant_write_buffered = None
4401 test_reentrant_write_text = None
4402
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004403
Ezio Melottidaa42c72013-03-23 16:30:16 +02004404def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07004405 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004406 CBufferedReaderTest, PyBufferedReaderTest,
4407 CBufferedWriterTest, PyBufferedWriterTest,
4408 CBufferedRWPairTest, PyBufferedRWPairTest,
4409 CBufferedRandomTest, PyBufferedRandomTest,
4410 StatefulIncrementalDecoderTest,
4411 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4412 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004413 CMiscIOTest, PyMiscIOTest,
4414 CSignalsTest, PySignalsTest,
4415 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004416
4417 # Put the namespaces of the IO module we are testing and some useful mock
4418 # classes in the __dict__ of each test.
4419 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
benfogle9703f092017-11-10 16:03:40 -05004420 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4421 SlowFlushRawIO)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004422 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4423 c_io_ns = {name : getattr(io, name) for name in all_members}
4424 py_io_ns = {name : getattr(pyio, name) for name in all_members}
4425 globs = globals()
4426 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4427 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4428 # Avoid turning open into a bound method.
4429 py_io_ns["open"] = pyio.OpenWrapper
4430 for test in tests:
4431 if test.__name__.startswith("C"):
4432 for name, obj in c_io_ns.items():
4433 setattr(test, name, obj)
4434 elif test.__name__.startswith("Py"):
4435 for name, obj in py_io_ns.items():
4436 setattr(test, name, obj)
4437
Ezio Melottidaa42c72013-03-23 16:30:16 +02004438 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4439 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004440
4441if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004442 unittest.main()