blob: 5406a2891bb251bcd50831b8d2c4483831691738 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Gregory P. Smithe5796c42018-12-30 20:17:57 -080031import sysconfig
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020032import threading
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000033import time
Guido van Rossum28524c72007-02-27 05:47:44 +000034import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000035import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020037from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020038from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000039from test import support
Berker Peksagce643912015-05-06 06:33:17 +030040from test.support.script_helper import assert_python_ok, run_python_until_end
Serhiy Storchakab21d1552018-03-02 11:53:51 +020041from test.support import FakePath
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000042
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000043import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000044import io # C implementation of io
45import _pyio as pyio # Python implementation of io
Guido van Rossuma9e20242007-03-08 00:43:48 +000046
Martin Panter6bb91f32016-05-28 00:41:57 +000047try:
48 import ctypes
49except ImportError:
50 def byteslike(*pos, **kw):
51 return array.array("b", bytes(*pos, **kw))
52else:
53 def byteslike(*pos, **kw):
54 """Create a bytes-like object having no string or sequence methods"""
55 data = bytes(*pos, **kw)
56 obj = EmptyStruct()
57 ctypes.resize(obj, len(data))
58 memoryview(obj).cast("B")[:] = data
59 return obj
60 class EmptyStruct(ctypes.Structure):
61 pass
62
Gregory P. Smithe5796c42018-12-30 20:17:57 -080063_cflags = sysconfig.get_config_var('CFLAGS') or ''
64_config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
65MEMORY_SANITIZER = (
66 '-fsanitize=memory' in _cflags or
67 '--with-memory-sanitizer' in _config_args
68)
69
Victor Stinner44235042019-04-12 17:06:47 +020070# Does io.IOBase logs unhandled exceptions on calling close()?
71# They are silenced by default in release build.
72DESTRUCTOR_LOG_ERRORS = (hasattr(sys, "gettotalrefcount") or sys.flags.dev_mode)
73
74
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000075def _default_chunk_size():
76 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000077 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000078 return f._CHUNK_SIZE
79
80
Antoine Pitrou328ec742010-09-14 18:37:24 +000081class MockRawIOWithoutRead:
82 """A RawIO implementation without read(), so as to exercise the default
83 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000084
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000085 def __init__(self, read_stack=()):
86 self._read_stack = list(read_stack)
87 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000088 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000089 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000090
Guido van Rossum01a27522007-03-07 01:00:12 +000091 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000092 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000093 return len(b)
94
95 def writable(self):
96 return True
97
Guido van Rossum68bbcd22007-02-27 17:19:33 +000098 def fileno(self):
99 return 42
100
101 def readable(self):
102 return True
103
Guido van Rossum01a27522007-03-07 01:00:12 +0000104 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000105 return True
106
Guido van Rossum01a27522007-03-07 01:00:12 +0000107 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000108 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +0000109
110 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000111 return 0 # same comment as above
112
113 def readinto(self, buf):
114 self._reads += 1
115 max_len = len(buf)
116 try:
117 data = self._read_stack[0]
118 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000119 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000120 return 0
121 if data is None:
122 del self._read_stack[0]
123 return None
124 n = len(data)
125 if len(data) <= max_len:
126 del self._read_stack[0]
127 buf[:n] = data
128 return n
129 else:
130 buf[:] = data[:max_len]
131 self._read_stack[0] = data[max_len:]
132 return max_len
133
134 def truncate(self, pos=None):
135 return pos
136
Antoine Pitrou328ec742010-09-14 18:37:24 +0000137class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
138 pass
139
140class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
141 pass
142
143
144class MockRawIO(MockRawIOWithoutRead):
145
146 def read(self, n=None):
147 self._reads += 1
148 try:
149 return self._read_stack.pop(0)
150 except:
151 self._extraneous_reads += 1
152 return b""
153
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000154class CMockRawIO(MockRawIO, io.RawIOBase):
155 pass
156
157class PyMockRawIO(MockRawIO, pyio.RawIOBase):
158 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000159
Guido van Rossuma9e20242007-03-08 00:43:48 +0000160
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000161class MisbehavedRawIO(MockRawIO):
162 def write(self, b):
163 return super().write(b) * 2
164
165 def read(self, n=None):
166 return super().read(n) * 2
167
168 def seek(self, pos, whence):
169 return -123
170
171 def tell(self):
172 return -456
173
174 def readinto(self, buf):
175 super().readinto(buf)
176 return len(buf) * 5
177
178class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
179 pass
180
181class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
182 pass
183
184
benfogle9703f092017-11-10 16:03:40 -0500185class SlowFlushRawIO(MockRawIO):
186 def __init__(self):
187 super().__init__()
188 self.in_flush = threading.Event()
189
190 def flush(self):
191 self.in_flush.set()
192 time.sleep(0.25)
193
194class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
195 pass
196
197class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
198 pass
199
200
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000201class CloseFailureIO(MockRawIO):
202 closed = 0
203
204 def close(self):
205 if not self.closed:
206 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200207 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000208
209class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
210 pass
211
212class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
213 pass
214
215
216class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000217
218 def __init__(self, data):
219 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000220 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000221
222 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000223 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000224 self.read_history.append(None if res is None else len(res))
225 return res
226
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000227 def readinto(self, b):
228 res = super().readinto(b)
229 self.read_history.append(res)
230 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000231
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000232class CMockFileIO(MockFileIO, io.BytesIO):
233 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000234
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000235class PyMockFileIO(MockFileIO, pyio.BytesIO):
236 pass
237
238
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000239class MockUnseekableIO:
240 def seekable(self):
241 return False
242
243 def seek(self, *args):
244 raise self.UnsupportedOperation("not seekable")
245
246 def tell(self, *args):
247 raise self.UnsupportedOperation("not seekable")
248
Martin Panter754aab22016-03-31 07:21:56 +0000249 def truncate(self, *args):
250 raise self.UnsupportedOperation("not seekable")
251
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000252class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
253 UnsupportedOperation = io.UnsupportedOperation
254
255class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
256 UnsupportedOperation = pyio.UnsupportedOperation
257
258
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000259class MockNonBlockWriterIO:
260
261 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000262 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000263 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000264
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000265 def pop_written(self):
266 s = b"".join(self._write_stack)
267 self._write_stack[:] = []
268 return s
269
270 def block_on(self, char):
271 """Block when a given char is encountered."""
272 self._blocker_char = char
273
274 def readable(self):
275 return True
276
277 def seekable(self):
278 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000279
Guido van Rossum01a27522007-03-07 01:00:12 +0000280 def writable(self):
281 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000282
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000283 def write(self, b):
284 b = bytes(b)
285 n = -1
286 if self._blocker_char:
287 try:
288 n = b.index(self._blocker_char)
289 except ValueError:
290 pass
291 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100292 if n > 0:
293 # write data up to the first blocker
294 self._write_stack.append(b[:n])
295 return n
296 else:
297 # cancel blocker and indicate would block
298 self._blocker_char = None
299 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000300 self._write_stack.append(b)
301 return len(b)
302
303class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
304 BlockingIOError = io.BlockingIOError
305
306class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
307 BlockingIOError = pyio.BlockingIOError
308
Guido van Rossuma9e20242007-03-08 00:43:48 +0000309
Guido van Rossum28524c72007-02-27 05:47:44 +0000310class IOTest(unittest.TestCase):
311
Neal Norwitze7789b12008-03-24 06:18:09 +0000312 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000313 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000314
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000315 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000316 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000317
Guido van Rossum28524c72007-02-27 05:47:44 +0000318 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000319 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000320 f.truncate(0)
321 self.assertEqual(f.tell(), 5)
322 f.seek(0)
323
324 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000325 self.assertEqual(f.seek(0), 0)
326 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000327 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000328 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000329 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000330 buffer = bytearray(b" world\n\n\n")
331 self.assertEqual(f.write(buffer), 9)
332 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000333 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000334 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000335 self.assertEqual(f.seek(-1, 2), 13)
336 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000337
Guido van Rossum87429772007-04-10 21:06:59 +0000338 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000339 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000340 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000341
Guido van Rossum9b76da62007-04-11 01:09:03 +0000342 def read_ops(self, f, buffered=False):
343 data = f.read(5)
344 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000345 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000346 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000347 self.assertEqual(bytes(data), b" worl")
348 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000349 self.assertEqual(f.readinto(data), 2)
350 self.assertEqual(len(data), 5)
351 self.assertEqual(data[:2], b"d\n")
352 self.assertEqual(f.seek(0), 0)
353 self.assertEqual(f.read(20), b"hello world\n")
354 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000355 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000356 self.assertEqual(f.seek(-6, 2), 6)
357 self.assertEqual(f.read(5), b"world")
358 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000359 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000360 self.assertEqual(f.seek(-6, 1), 5)
361 self.assertEqual(f.read(5), b" worl")
362 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000363 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000364 if buffered:
365 f.seek(0)
366 self.assertEqual(f.read(), b"hello world\n")
367 f.seek(6)
368 self.assertEqual(f.read(), b"world\n")
369 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000370 f.seek(0)
371 data = byteslike(5)
372 self.assertEqual(f.readinto1(data), 5)
373 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000374
Guido van Rossum34d69e52007-04-10 20:08:41 +0000375 LARGE = 2**31
376
Guido van Rossum53807da2007-04-10 19:01:47 +0000377 def large_file_ops(self, f):
378 assert f.readable()
379 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100380 try:
381 self.assertEqual(f.seek(self.LARGE), self.LARGE)
382 except (OverflowError, ValueError):
383 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000384 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000385 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000386 self.assertEqual(f.tell(), self.LARGE + 3)
387 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000388 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000389 self.assertEqual(f.tell(), self.LARGE + 2)
390 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000391 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000392 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000393 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
394 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000395 self.assertEqual(f.read(2), b"x")
396
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000397 def test_invalid_operations(self):
398 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000399 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000400 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000401 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000402 self.assertRaises(exc, fp.read)
403 self.assertRaises(exc, fp.readline)
404 with self.open(support.TESTFN, "wb", buffering=0) as fp:
405 self.assertRaises(exc, fp.read)
406 self.assertRaises(exc, fp.readline)
407 with self.open(support.TESTFN, "rb", buffering=0) as fp:
408 self.assertRaises(exc, fp.write, b"blah")
409 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000410 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000411 self.assertRaises(exc, fp.write, b"blah")
412 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000413 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000414 self.assertRaises(exc, fp.write, "blah")
415 self.assertRaises(exc, fp.writelines, ["blah\n"])
416 # Non-zero seeking from current or end pos
417 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
418 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000419
Martin Panter754aab22016-03-31 07:21:56 +0000420 def test_optional_abilities(self):
421 # Test for OSError when optional APIs are not supported
422 # The purpose of this test is to try fileno(), reading, writing and
423 # seeking operations with various objects that indicate they do not
424 # support these operations.
425
426 def pipe_reader():
427 [r, w] = os.pipe()
428 os.close(w) # So that read() is harmless
429 return self.FileIO(r, "r")
430
431 def pipe_writer():
432 [r, w] = os.pipe()
433 self.addCleanup(os.close, r)
434 # Guarantee that we can write into the pipe without blocking
435 thread = threading.Thread(target=os.read, args=(r, 100))
436 thread.start()
437 self.addCleanup(thread.join)
438 return self.FileIO(w, "w")
439
440 def buffered_reader():
441 return self.BufferedReader(self.MockUnseekableIO())
442
443 def buffered_writer():
444 return self.BufferedWriter(self.MockUnseekableIO())
445
446 def buffered_random():
447 return self.BufferedRandom(self.BytesIO())
448
449 def buffered_rw_pair():
450 return self.BufferedRWPair(self.MockUnseekableIO(),
451 self.MockUnseekableIO())
452
453 def text_reader():
454 class UnseekableReader(self.MockUnseekableIO):
455 writable = self.BufferedIOBase.writable
456 write = self.BufferedIOBase.write
457 return self.TextIOWrapper(UnseekableReader(), "ascii")
458
459 def text_writer():
460 class UnseekableWriter(self.MockUnseekableIO):
461 readable = self.BufferedIOBase.readable
462 read = self.BufferedIOBase.read
463 return self.TextIOWrapper(UnseekableWriter(), "ascii")
464
465 tests = (
466 (pipe_reader, "fr"), (pipe_writer, "fw"),
467 (buffered_reader, "r"), (buffered_writer, "w"),
468 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
469 (text_reader, "r"), (text_writer, "w"),
470 (self.BytesIO, "rws"), (self.StringIO, "rws"),
471 )
472 for [test, abilities] in tests:
Martin Panter754aab22016-03-31 07:21:56 +0000473 with self.subTest(test), test() as obj:
474 readable = "r" in abilities
475 self.assertEqual(obj.readable(), readable)
476 writable = "w" in abilities
477 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000478
479 if isinstance(obj, self.TextIOBase):
480 data = "3"
481 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
482 data = b"3"
483 else:
484 self.fail("Unknown base class")
485
486 if "f" in abilities:
487 obj.fileno()
488 else:
489 self.assertRaises(OSError, obj.fileno)
490
491 if readable:
492 obj.read(1)
493 obj.read()
494 else:
495 self.assertRaises(OSError, obj.read, 1)
496 self.assertRaises(OSError, obj.read)
497
498 if writable:
499 obj.write(data)
500 else:
501 self.assertRaises(OSError, obj.write, data)
502
Martin Panter3ee147f2016-03-31 21:05:31 +0000503 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000504 pipe_reader, pipe_writer):
505 # Pipes seem to appear as seekable on Windows
506 continue
507 seekable = "s" in abilities
508 self.assertEqual(obj.seekable(), seekable)
509
Martin Panter754aab22016-03-31 07:21:56 +0000510 if seekable:
511 obj.tell()
512 obj.seek(0)
513 else:
514 self.assertRaises(OSError, obj.tell)
515 self.assertRaises(OSError, obj.seek, 0)
516
517 if writable and seekable:
518 obj.truncate()
519 obj.truncate(0)
520 else:
521 self.assertRaises(OSError, obj.truncate)
522 self.assertRaises(OSError, obj.truncate, 0)
523
Antoine Pitrou13348842012-01-29 18:36:34 +0100524 def test_open_handles_NUL_chars(self):
525 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300526 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100527
528 bytes_fn = bytes(fn_with_NUL, 'ascii')
529 with warnings.catch_warnings():
530 warnings.simplefilter("ignore", DeprecationWarning)
531 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100532
Guido van Rossum28524c72007-02-27 05:47:44 +0000533 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000534 with self.open(support.TESTFN, "wb", buffering=0) as f:
535 self.assertEqual(f.readable(), False)
536 self.assertEqual(f.writable(), True)
537 self.assertEqual(f.seekable(), True)
538 self.write_ops(f)
539 with self.open(support.TESTFN, "rb", buffering=0) as f:
540 self.assertEqual(f.readable(), True)
541 self.assertEqual(f.writable(), False)
542 self.assertEqual(f.seekable(), True)
543 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000544
Guido van Rossum87429772007-04-10 21:06:59 +0000545 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000546 with self.open(support.TESTFN, "wb") as f:
547 self.assertEqual(f.readable(), False)
548 self.assertEqual(f.writable(), True)
549 self.assertEqual(f.seekable(), True)
550 self.write_ops(f)
551 with self.open(support.TESTFN, "rb") as f:
552 self.assertEqual(f.readable(), True)
553 self.assertEqual(f.writable(), False)
554 self.assertEqual(f.seekable(), True)
555 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000556
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000557 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000558 with self.open(support.TESTFN, "wb") as f:
559 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
560 with self.open(support.TESTFN, "rb") as f:
561 self.assertEqual(f.readline(), b"abc\n")
562 self.assertEqual(f.readline(10), b"def\n")
563 self.assertEqual(f.readline(2), b"xy")
564 self.assertEqual(f.readline(4), b"zzy\n")
565 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000566 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000567 self.assertRaises(TypeError, f.readline, 5.3)
568 with self.open(support.TESTFN, "r") as f:
569 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000570
Serhiy Storchakabf623ae2017-04-19 20:03:52 +0300571 def test_readline_nonsizeable(self):
572 # Issue #30061
573 # Crash when readline() returns an object without __len__
574 class R(self.IOBase):
575 def readline(self):
576 return None
577 self.assertRaises((TypeError, StopIteration), next, R())
578
579 def test_next_nonsizeable(self):
580 # Issue #30061
581 # Crash when __next__() returns an object without __len__
582 class R(self.IOBase):
583 def __next__(self):
584 return None
585 self.assertRaises(TypeError, R().readlines, 1)
586
Guido van Rossum28524c72007-02-27 05:47:44 +0000587 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000588 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000589 self.write_ops(f)
590 data = f.getvalue()
591 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000592 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000593 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000594
Guido van Rossum53807da2007-04-10 19:01:47 +0000595 def test_large_file_ops(self):
Mike53f7a7c2017-12-14 14:04:53 +0300596 # On Windows and Mac OSX this test consumes large resources; It takes
Victor Stinner8c663fd2017-11-08 14:44:44 -0800597 # a long time to build the >2 GiB file and takes >2 GiB of disk space
Guido van Rossum34d69e52007-04-10 20:08:41 +0000598 # therefore the resource must be enabled to run this test.
Victor Stinner937ee9e2018-06-26 02:11:06 +0200599 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600600 support.requires(
601 'largefile',
602 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000603 with self.open(support.TESTFN, "w+b", 0) as f:
604 self.large_file_ops(f)
605 with self.open(support.TESTFN, "w+b") as f:
606 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000607
608 def test_with_open(self):
Alexey Izbysheva2670562018-10-20 03:22:31 +0300609 for bufsize in (0, 100):
Guido van Rossum87429772007-04-10 21:06:59 +0000610 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000611 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000612 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000613 self.assertEqual(f.closed, True)
614 f = None
615 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000616 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000617 1/0
618 except ZeroDivisionError:
619 self.assertEqual(f.closed, True)
620 else:
621 self.fail("1/0 didn't raise an exception")
622
Antoine Pitrou08838b62009-01-21 00:55:13 +0000623 # issue 5008
624 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000625 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000626 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000627 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000628 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000629 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000630 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000631 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300632 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000633
Guido van Rossum87429772007-04-10 21:06:59 +0000634 def test_destructor(self):
635 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000636 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000637 def __del__(self):
638 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000639 try:
640 f = super().__del__
641 except AttributeError:
642 pass
643 else:
644 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000645 def close(self):
646 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000647 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000648 def flush(self):
649 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000650 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000651 with support.check_warnings(('', ResourceWarning)):
652 f = MyFileIO(support.TESTFN, "wb")
653 f.write(b"xxx")
654 del f
655 support.gc_collect()
656 self.assertEqual(record, [1, 2, 3])
657 with self.open(support.TESTFN, "rb") as f:
658 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000659
660 def _check_base_destructor(self, base):
661 record = []
662 class MyIO(base):
663 def __init__(self):
664 # This exercises the availability of attributes on object
665 # destruction.
666 # (in the C version, close() is called by the tp_dealloc
667 # function, not by __del__)
668 self.on_del = 1
669 self.on_close = 2
670 self.on_flush = 3
671 def __del__(self):
672 record.append(self.on_del)
673 try:
674 f = super().__del__
675 except AttributeError:
676 pass
677 else:
678 f()
679 def close(self):
680 record.append(self.on_close)
681 super().close()
682 def flush(self):
683 record.append(self.on_flush)
684 super().flush()
685 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000686 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000687 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000688 self.assertEqual(record, [1, 2, 3])
689
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000690 def test_IOBase_destructor(self):
691 self._check_base_destructor(self.IOBase)
692
693 def test_RawIOBase_destructor(self):
694 self._check_base_destructor(self.RawIOBase)
695
696 def test_BufferedIOBase_destructor(self):
697 self._check_base_destructor(self.BufferedIOBase)
698
699 def test_TextIOBase_destructor(self):
700 self._check_base_destructor(self.TextIOBase)
701
Guido van Rossum87429772007-04-10 21:06:59 +0000702 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000703 with self.open(support.TESTFN, "wb") as f:
704 f.write(b"xxx")
705 with self.open(support.TESTFN, "rb") as f:
706 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000707
Guido van Rossumd4103952007-04-12 05:44:49 +0000708 def test_array_writes(self):
709 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000710 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000711 def check(f):
712 with f:
713 self.assertEqual(f.write(a), n)
714 f.writelines((a,))
715 check(self.BytesIO())
716 check(self.FileIO(support.TESTFN, "w"))
717 check(self.BufferedWriter(self.MockRawIO()))
718 check(self.BufferedRandom(self.MockRawIO()))
719 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000720
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000721 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000722 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000723 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000724
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000725 def test_read_closed(self):
726 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000727 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000728 with self.open(support.TESTFN, "r") as f:
729 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000730 self.assertEqual(file.read(), "egg\n")
731 file.seek(0)
732 file.close()
733 self.assertRaises(ValueError, file.read)
734
735 def test_no_closefd_with_filename(self):
736 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000737 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000738
739 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000740 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000741 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000742 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000743 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000744 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000745 self.assertEqual(file.buffer.raw.closefd, False)
746
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000747 def test_garbage_collection(self):
748 # FileIO objects are collected, and collecting them flushes
749 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000750 with support.check_warnings(('', ResourceWarning)):
751 f = self.FileIO(support.TESTFN, "wb")
752 f.write(b"abcxxx")
753 f.f = f
754 wr = weakref.ref(f)
755 del f
756 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300757 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000758 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000759 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000760
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000761 def test_unbounded_file(self):
762 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
763 zero = "/dev/zero"
764 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000765 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000766 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000767 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000768 if support.real_max_memuse < support._2G:
Victor Stinner8c663fd2017-11-08 14:44:44 -0800769 self.skipTest("test requires at least 2 GiB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000770 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000771 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000772 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000773 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000774 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000775 self.assertRaises(OverflowError, f.read)
776
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200777 def check_flush_error_on_close(self, *args, **kwargs):
778 # Test that the file is closed despite failed flush
779 # and that flush() is called before file closed.
780 f = self.open(*args, **kwargs)
781 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000782 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200783 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200784 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000785 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200786 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600787 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200788 self.assertTrue(closed) # flush() called
789 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200790 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200791
792 def test_flush_error_on_close(self):
793 # raw file
794 # Issue #5700: io.FileIO calls flush() after file closed
795 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
796 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
797 self.check_flush_error_on_close(fd, 'wb', buffering=0)
798 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
799 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
800 os.close(fd)
801 # buffered io
802 self.check_flush_error_on_close(support.TESTFN, 'wb')
803 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
804 self.check_flush_error_on_close(fd, 'wb')
805 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
806 self.check_flush_error_on_close(fd, 'wb', closefd=False)
807 os.close(fd)
808 # text io
809 self.check_flush_error_on_close(support.TESTFN, 'w')
810 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
811 self.check_flush_error_on_close(fd, 'w')
812 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
813 self.check_flush_error_on_close(fd, 'w', closefd=False)
814 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000815
816 def test_multi_close(self):
817 f = self.open(support.TESTFN, "wb", buffering=0)
818 f.close()
819 f.close()
820 f.close()
821 self.assertRaises(ValueError, f.flush)
822
Antoine Pitrou328ec742010-09-14 18:37:24 +0000823 def test_RawIOBase_read(self):
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530824 # Exercise the default limited RawIOBase.read(n) implementation (which
825 # calls readinto() internally).
Antoine Pitrou328ec742010-09-14 18:37:24 +0000826 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
827 self.assertEqual(rawio.read(2), b"ab")
828 self.assertEqual(rawio.read(2), b"c")
829 self.assertEqual(rawio.read(2), b"d")
830 self.assertEqual(rawio.read(2), None)
831 self.assertEqual(rawio.read(2), b"ef")
832 self.assertEqual(rawio.read(2), b"g")
833 self.assertEqual(rawio.read(2), None)
834 self.assertEqual(rawio.read(2), b"")
835
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400836 def test_types_have_dict(self):
837 test = (
838 self.IOBase(),
839 self.RawIOBase(),
840 self.TextIOBase(),
841 self.StringIO(),
842 self.BytesIO()
843 )
844 for obj in test:
845 self.assertTrue(hasattr(obj, "__dict__"))
846
Ross Lagerwall59142db2011-10-31 20:34:46 +0200847 def test_opener(self):
848 with self.open(support.TESTFN, "w") as f:
849 f.write("egg\n")
850 fd = os.open(support.TESTFN, os.O_RDONLY)
851 def opener(path, flags):
852 return fd
853 with self.open("non-existent", "r", opener=opener) as f:
854 self.assertEqual(f.read(), "egg\n")
855
Barry Warsaw480e2852016-06-08 17:47:26 -0400856 def test_bad_opener_negative_1(self):
857 # Issue #27066.
858 def badopener(fname, flags):
859 return -1
860 with self.assertRaises(ValueError) as cm:
861 open('non-existent', 'r', opener=badopener)
862 self.assertEqual(str(cm.exception), 'opener returned -1')
863
864 def test_bad_opener_other_negative(self):
865 # Issue #27066.
866 def badopener(fname, flags):
867 return -2
868 with self.assertRaises(ValueError) as cm:
869 open('non-existent', 'r', opener=badopener)
870 self.assertEqual(str(cm.exception), 'opener returned -2')
871
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200872 def test_fileio_closefd(self):
873 # Issue #4841
874 with self.open(__file__, 'rb') as f1, \
875 self.open(__file__, 'rb') as f2:
876 fileio = self.FileIO(f1.fileno(), closefd=False)
877 # .__init__() must not close f1
878 fileio.__init__(f2.fileno(), closefd=False)
879 f1.readline()
880 # .close() must not close f2
881 fileio.close()
882 f2.readline()
883
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300884 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200885 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300886 with self.assertRaises(ValueError):
887 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300888
889 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200890 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300891 with self.assertRaises(ValueError):
892 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300893
Martin Panter6bb91f32016-05-28 00:41:57 +0000894 def test_buffered_readinto_mixin(self):
895 # Test the implementation provided by BufferedIOBase
896 class Stream(self.BufferedIOBase):
897 def read(self, size):
898 return b"12345"
899 read1 = read
900 stream = Stream()
901 for method in ("readinto", "readinto1"):
902 with self.subTest(method):
903 buffer = byteslike(5)
904 self.assertEqual(getattr(stream, method)(buffer), 5)
905 self.assertEqual(bytes(buffer), b"12345")
906
Ethan Furmand62548a2016-06-04 14:38:43 -0700907 def test_fspath_support(self):
Ethan Furmand62548a2016-06-04 14:38:43 -0700908 def check_path_succeeds(path):
909 with self.open(path, "w") as f:
910 f.write("egg\n")
911
912 with self.open(path, "r") as f:
913 self.assertEqual(f.read(), "egg\n")
914
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200915 check_path_succeeds(FakePath(support.TESTFN))
916 check_path_succeeds(FakePath(support.TESTFN.encode('utf-8')))
Ethan Furmand62548a2016-06-04 14:38:43 -0700917
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200918 with self.open(support.TESTFN, "w") as f:
919 bad_path = FakePath(f.fileno())
920 with self.assertRaises(TypeError):
921 self.open(bad_path, 'w')
922
923 bad_path = FakePath(None)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700924 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700925 self.open(bad_path, 'w')
926
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200927 bad_path = FakePath(FloatingPointError)
928 with self.assertRaises(FloatingPointError):
929 self.open(bad_path, 'w')
930
Ethan Furmand62548a2016-06-04 14:38:43 -0700931 # ensure that refcounting is correct with some error conditions
932 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200933 self.open(FakePath(support.TESTFN), 'rwxa')
Ethan Furmand62548a2016-06-04 14:38:43 -0700934
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530935 def test_RawIOBase_readall(self):
936 # Exercise the default unlimited RawIOBase.read() and readall()
937 # implementations.
938 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
939 self.assertEqual(rawio.read(), b"abcdefg")
940 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
941 self.assertEqual(rawio.readall(), b"abcdefg")
942
943 def test_BufferedIOBase_readinto(self):
944 # Exercise the default BufferedIOBase.readinto() and readinto1()
945 # implementations (which call read() or read1() internally).
946 class Reader(self.BufferedIOBase):
947 def __init__(self, avail):
948 self.avail = avail
949 def read(self, size):
950 result = self.avail[:size]
951 self.avail = self.avail[size:]
952 return result
953 def read1(self, size):
954 """Returns no more than 5 bytes at once"""
955 return self.read(min(size, 5))
956 tests = (
957 # (test method, total data available, read buffer size, expected
958 # read size)
959 ("readinto", 10, 5, 5),
960 ("readinto", 10, 6, 6), # More than read1() can return
961 ("readinto", 5, 6, 5), # Buffer larger than total available
962 ("readinto", 6, 7, 6),
963 ("readinto", 10, 0, 0), # Empty buffer
964 ("readinto1", 10, 5, 5), # Result limited to single read1() call
965 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return
966 ("readinto1", 5, 6, 5), # Buffer larger than total available
967 ("readinto1", 6, 7, 5),
968 ("readinto1", 10, 0, 0), # Empty buffer
969 )
970 UNUSED_BYTE = 0x81
971 for test in tests:
972 with self.subTest(test):
973 method, avail, request, result = test
974 reader = Reader(bytes(range(avail)))
975 buffer = bytearray((UNUSED_BYTE,) * request)
976 method = getattr(reader, method)
977 self.assertEqual(method(buffer), result)
978 self.assertEqual(len(buffer), request)
979 self.assertSequenceEqual(buffer[:result], range(result))
980 unused = (UNUSED_BYTE,) * (request - result)
981 self.assertSequenceEqual(buffer[result:], unused)
982 self.assertEqual(len(reader.avail), avail - result)
983
Zackery Spytz28f07362018-07-17 00:31:44 -0600984 def test_close_assert(self):
985 class R(self.IOBase):
986 def __setattr__(self, name, value):
987 pass
988 def flush(self):
989 raise OSError()
990 f = R()
991 # This would cause an assertion failure.
992 self.assertRaises(OSError, f.close)
993
Victor Stinner472f7942019-04-12 21:58:24 +0200994 # Silence destructor error
995 R.flush = lambda self: None
996
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200997
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000998class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200999
1000 def test_IOBase_finalize(self):
1001 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
1002 # class which inherits IOBase and an object of this class are caught
1003 # in a reference cycle and close() is already in the method cache.
1004 class MyIO(self.IOBase):
1005 def close(self):
1006 pass
1007
1008 # create an instance to populate the method cache
1009 MyIO()
1010 obj = MyIO()
1011 obj.obj = obj
1012 wr = weakref.ref(obj)
1013 del MyIO
1014 del obj
1015 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001016 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +00001017
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001018class PyIOTest(IOTest):
1019 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001020
Guido van Rossuma9e20242007-03-08 00:43:48 +00001021
Gregory P. Smith1bef9072015-04-14 13:24:34 -07001022@support.cpython_only
1023class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -07001024
Gregory P. Smith054b0652015-04-14 12:58:05 -07001025 def test_RawIOBase_io_in_pyio_match(self):
1026 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +02001027 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
1028 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -07001029 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1030
1031 def test_RawIOBase_pyio_in_io_match(self):
1032 """Test that c RawIOBase class has all pyio RawIOBase methods"""
1033 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1034 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1035
1036
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001037class CommonBufferedTests:
1038 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1039
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001040 def test_detach(self):
1041 raw = self.MockRawIO()
1042 buf = self.tp(raw)
1043 self.assertIs(buf.detach(), raw)
1044 self.assertRaises(ValueError, buf.detach)
1045
Benjamin Peterson10e76b62014-12-21 20:51:50 -06001046 repr(buf) # Should still work
1047
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001048 def test_fileno(self):
1049 rawio = self.MockRawIO()
1050 bufio = self.tp(rawio)
1051
Ezio Melottib3aedd42010-11-20 19:04:17 +00001052 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001053
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001054 def test_invalid_args(self):
1055 rawio = self.MockRawIO()
1056 bufio = self.tp(rawio)
1057 # Invalid whence
1058 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +02001059 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001060
1061 def test_override_destructor(self):
1062 tp = self.tp
1063 record = []
1064 class MyBufferedIO(tp):
1065 def __del__(self):
1066 record.append(1)
1067 try:
1068 f = super().__del__
1069 except AttributeError:
1070 pass
1071 else:
1072 f()
1073 def close(self):
1074 record.append(2)
1075 super().close()
1076 def flush(self):
1077 record.append(3)
1078 super().flush()
1079 rawio = self.MockRawIO()
1080 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001081 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001082 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +00001083 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001084
1085 def test_context_manager(self):
1086 # Test usability as a context manager
1087 rawio = self.MockRawIO()
1088 bufio = self.tp(rawio)
1089 def _with():
1090 with bufio:
1091 pass
1092 _with()
1093 # bufio should now be closed, and using it a second time should raise
1094 # a ValueError.
1095 self.assertRaises(ValueError, _with)
1096
1097 def test_error_through_destructor(self):
1098 # Test that the exception state is not modified by a destructor,
1099 # even if close() fails.
1100 rawio = self.CloseFailureIO()
1101 def f():
1102 self.tp(rawio).xyzzy
1103 with support.captured_output("stderr") as s:
1104 self.assertRaises(AttributeError, f)
1105 s = s.getvalue().strip()
1106 if s:
1107 # The destructor *may* have printed an unraisable error, check it
Victor Stinner44235042019-04-12 17:06:47 +02001108 lines = s.splitlines()
1109 if DESTRUCTOR_LOG_ERRORS:
1110 self.assertEqual(len(lines), 5)
1111 self.assertTrue(lines[0].startswith("Exception ignored in: "), lines)
1112 self.assertEqual(lines[1], "Traceback (most recent call last):", lines)
1113 self.assertEqual(lines[4], 'OSError:', lines)
1114 else:
1115 self.assertEqual(len(lines), 1)
1116 self.assertTrue(lines[-1].startswith("Exception OSError: "), lines)
1117 self.assertTrue(lines[-1].endswith(" ignored"), lines)
Guido van Rossum78892e42007-04-06 17:31:18 +00001118
Antoine Pitrou716c4442009-05-23 19:04:03 +00001119 def test_repr(self):
1120 raw = self.MockRawIO()
1121 b = self.tp(raw)
Serhiy Storchaka521e5862014-07-22 15:00:37 +03001122 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001123 self.assertEqual(repr(b), "<%s>" % clsname)
1124 raw.name = "dummy"
1125 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
1126 raw.name = b"dummy"
1127 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
1128
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02001129 def test_recursive_repr(self):
1130 # Issue #25455
1131 raw = self.MockRawIO()
1132 b = self.tp(raw)
1133 with support.swap_attr(raw, 'name', b):
1134 try:
1135 repr(b) # Should not crash
1136 except RuntimeError:
1137 pass
1138
Antoine Pitrou6be88762010-05-03 16:48:20 +00001139 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001140 # Test that buffered file is closed despite failed flush
1141 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001142 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001143 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001144 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001145 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001146 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001147 raw.flush = bad_flush
1148 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001149 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001150 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001151 self.assertTrue(raw.closed)
1152 self.assertTrue(closed) # flush() called
1153 self.assertFalse(closed[0]) # flush() called before file closed
1154 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001155 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001156
1157 def test_close_error_on_close(self):
1158 raw = self.MockRawIO()
1159 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001160 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001161 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001162 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001163 raw.close = bad_close
1164 b = self.tp(raw)
1165 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001166 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001167 b.close()
1168 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001169 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001170 self.assertEqual(err.exception.__context__.args, ('flush',))
1171 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001172
Victor Stinner472f7942019-04-12 21:58:24 +02001173 # Silence destructor error
1174 raw.close = lambda: None
1175 b.flush = lambda: None
1176
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001177 def test_nonnormalized_close_error_on_close(self):
1178 # Issue #21677
1179 raw = self.MockRawIO()
1180 def bad_flush():
1181 raise non_existing_flush
1182 def bad_close():
1183 raise non_existing_close
1184 raw.close = bad_close
1185 b = self.tp(raw)
1186 b.flush = bad_flush
1187 with self.assertRaises(NameError) as err: # exception not swallowed
1188 b.close()
1189 self.assertIn('non_existing_close', str(err.exception))
1190 self.assertIsInstance(err.exception.__context__, NameError)
1191 self.assertIn('non_existing_flush', str(err.exception.__context__))
1192 self.assertFalse(b.closed)
1193
Victor Stinner472f7942019-04-12 21:58:24 +02001194 # Silence destructor error
1195 b.flush = lambda: None
1196 raw.close = lambda: None
1197
Antoine Pitrou6be88762010-05-03 16:48:20 +00001198 def test_multi_close(self):
1199 raw = self.MockRawIO()
1200 b = self.tp(raw)
1201 b.close()
1202 b.close()
1203 b.close()
1204 self.assertRaises(ValueError, b.flush)
1205
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001206 def test_unseekable(self):
1207 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1208 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1209 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1210
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001211 def test_readonly_attributes(self):
1212 raw = self.MockRawIO()
1213 buf = self.tp(raw)
1214 x = self.MockRawIO()
1215 with self.assertRaises(AttributeError):
1216 buf.raw = x
1217
Guido van Rossum78892e42007-04-06 17:31:18 +00001218
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001219class SizeofTest:
1220
1221 @support.cpython_only
1222 def test_sizeof(self):
1223 bufsize1 = 4096
1224 bufsize2 = 8192
1225 rawio = self.MockRawIO()
1226 bufio = self.tp(rawio, buffer_size=bufsize1)
1227 size = sys.getsizeof(bufio) - bufsize1
1228 rawio = self.MockRawIO()
1229 bufio = self.tp(rawio, buffer_size=bufsize2)
1230 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1231
Jesus Ceadc469452012-10-04 12:37:56 +02001232 @support.cpython_only
1233 def test_buffer_freeing(self) :
1234 bufsize = 4096
1235 rawio = self.MockRawIO()
1236 bufio = self.tp(rawio, buffer_size=bufsize)
1237 size = sys.getsizeof(bufio) - bufsize
1238 bufio.close()
1239 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001240
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001241class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1242 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001243
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001244 def test_constructor(self):
1245 rawio = self.MockRawIO([b"abc"])
1246 bufio = self.tp(rawio)
1247 bufio.__init__(rawio)
1248 bufio.__init__(rawio, buffer_size=1024)
1249 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001250 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001251 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1252 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1253 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1254 rawio = self.MockRawIO([b"abc"])
1255 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001256 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001257
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001258 def test_uninitialized(self):
1259 bufio = self.tp.__new__(self.tp)
1260 del bufio
1261 bufio = self.tp.__new__(self.tp)
1262 self.assertRaisesRegex((ValueError, AttributeError),
1263 'uninitialized|has no attribute',
1264 bufio.read, 0)
1265 bufio.__init__(self.MockRawIO())
1266 self.assertEqual(bufio.read(0), b'')
1267
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001268 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001269 for arg in (None, 7):
1270 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1271 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001272 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001273 # Invalid args
1274 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001275
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001276 def test_read1(self):
1277 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1278 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001279 self.assertEqual(b"a", bufio.read(1))
1280 self.assertEqual(b"b", bufio.read1(1))
1281 self.assertEqual(rawio._reads, 1)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001282 self.assertEqual(b"", bufio.read1(0))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001283 self.assertEqual(b"c", bufio.read1(100))
1284 self.assertEqual(rawio._reads, 1)
1285 self.assertEqual(b"d", bufio.read1(100))
1286 self.assertEqual(rawio._reads, 2)
1287 self.assertEqual(b"efg", bufio.read1(100))
1288 self.assertEqual(rawio._reads, 3)
1289 self.assertEqual(b"", bufio.read1(100))
1290 self.assertEqual(rawio._reads, 4)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001291
1292 def test_read1_arbitrary(self):
1293 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1294 bufio = self.tp(rawio)
1295 self.assertEqual(b"a", bufio.read(1))
1296 self.assertEqual(b"bc", bufio.read1())
1297 self.assertEqual(b"d", bufio.read1())
1298 self.assertEqual(b"efg", bufio.read1(-1))
1299 self.assertEqual(rawio._reads, 3)
1300 self.assertEqual(b"", bufio.read1())
1301 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001302
1303 def test_readinto(self):
1304 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1305 bufio = self.tp(rawio)
1306 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001307 self.assertEqual(bufio.readinto(b), 2)
1308 self.assertEqual(b, b"ab")
1309 self.assertEqual(bufio.readinto(b), 2)
1310 self.assertEqual(b, b"cd")
1311 self.assertEqual(bufio.readinto(b), 2)
1312 self.assertEqual(b, b"ef")
1313 self.assertEqual(bufio.readinto(b), 1)
1314 self.assertEqual(b, b"gf")
1315 self.assertEqual(bufio.readinto(b), 0)
1316 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001317 rawio = self.MockRawIO((b"abc", None))
1318 bufio = self.tp(rawio)
1319 self.assertEqual(bufio.readinto(b), 2)
1320 self.assertEqual(b, b"ab")
1321 self.assertEqual(bufio.readinto(b), 1)
1322 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001323
Benjamin Petersona96fea02014-06-22 14:17:44 -07001324 def test_readinto1(self):
1325 buffer_size = 10
1326 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1327 bufio = self.tp(rawio, buffer_size=buffer_size)
1328 b = bytearray(2)
1329 self.assertEqual(bufio.peek(3), b'abc')
1330 self.assertEqual(rawio._reads, 1)
1331 self.assertEqual(bufio.readinto1(b), 2)
1332 self.assertEqual(b, b"ab")
1333 self.assertEqual(rawio._reads, 1)
1334 self.assertEqual(bufio.readinto1(b), 1)
1335 self.assertEqual(b[:1], b"c")
1336 self.assertEqual(rawio._reads, 1)
1337 self.assertEqual(bufio.readinto1(b), 2)
1338 self.assertEqual(b, b"de")
1339 self.assertEqual(rawio._reads, 2)
1340 b = bytearray(2*buffer_size)
1341 self.assertEqual(bufio.peek(3), b'fgh')
1342 self.assertEqual(rawio._reads, 3)
1343 self.assertEqual(bufio.readinto1(b), 6)
1344 self.assertEqual(b[:6], b"fghjkl")
1345 self.assertEqual(rawio._reads, 4)
1346
1347 def test_readinto_array(self):
1348 buffer_size = 60
1349 data = b"a" * 26
1350 rawio = self.MockRawIO((data,))
1351 bufio = self.tp(rawio, buffer_size=buffer_size)
1352
1353 # Create an array with element size > 1 byte
1354 b = array.array('i', b'x' * 32)
1355 assert len(b) != 16
1356
1357 # Read into it. We should get as many *bytes* as we can fit into b
1358 # (which is more than the number of elements)
1359 n = bufio.readinto(b)
1360 self.assertGreater(n, len(b))
1361
1362 # Check that old contents of b are preserved
1363 bm = memoryview(b).cast('B')
1364 self.assertLess(n, len(bm))
1365 self.assertEqual(bm[:n], data[:n])
1366 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1367
1368 def test_readinto1_array(self):
1369 buffer_size = 60
1370 data = b"a" * 26
1371 rawio = self.MockRawIO((data,))
1372 bufio = self.tp(rawio, buffer_size=buffer_size)
1373
1374 # Create an array with element size > 1 byte
1375 b = array.array('i', b'x' * 32)
1376 assert len(b) != 16
1377
1378 # Read into it. We should get as many *bytes* as we can fit into b
1379 # (which is more than the number of elements)
1380 n = bufio.readinto1(b)
1381 self.assertGreater(n, len(b))
1382
1383 # Check that old contents of b are preserved
1384 bm = memoryview(b).cast('B')
1385 self.assertLess(n, len(bm))
1386 self.assertEqual(bm[:n], data[:n])
1387 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1388
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001389 def test_readlines(self):
1390 def bufio():
1391 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1392 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001393 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1394 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1395 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001396
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001397 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001398 data = b"abcdefghi"
1399 dlen = len(data)
1400
1401 tests = [
1402 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1403 [ 100, [ 3, 3, 3], [ dlen ] ],
1404 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1405 ]
1406
1407 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001408 rawio = self.MockFileIO(data)
1409 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001410 pos = 0
1411 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001412 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001413 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001414 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001415 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001416
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001417 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001418 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001419 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1420 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001421 self.assertEqual(b"abcd", bufio.read(6))
1422 self.assertEqual(b"e", bufio.read(1))
1423 self.assertEqual(b"fg", bufio.read())
1424 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001425 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001426 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001427
Victor Stinnera80987f2011-05-25 22:47:16 +02001428 rawio = self.MockRawIO((b"a", None, None))
1429 self.assertEqual(b"a", rawio.readall())
1430 self.assertIsNone(rawio.readall())
1431
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001432 def test_read_past_eof(self):
1433 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1434 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001435
Ezio Melottib3aedd42010-11-20 19:04:17 +00001436 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001437
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001438 def test_read_all(self):
1439 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1440 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001441
Ezio Melottib3aedd42010-11-20 19:04:17 +00001442 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001443
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001444 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001445 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001446 try:
1447 # Write out many bytes with exactly the same number of 0's,
1448 # 1's... 255's. This will help us check that concurrent reading
1449 # doesn't duplicate or forget contents.
1450 N = 1000
1451 l = list(range(256)) * N
1452 random.shuffle(l)
1453 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001454 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001455 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001456 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001457 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001458 errors = []
1459 results = []
1460 def f():
1461 try:
1462 # Intra-buffer read then buffer-flushing read
1463 for n in cycle([1, 19]):
1464 s = bufio.read(n)
1465 if not s:
1466 break
1467 # list.append() is atomic
1468 results.append(s)
1469 except Exception as e:
1470 errors.append(e)
1471 raise
1472 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001473 with support.start_threads(threads):
1474 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001475 self.assertFalse(errors,
1476 "the following exceptions were caught: %r" % errors)
1477 s = b''.join(results)
1478 for i in range(256):
1479 c = bytes(bytearray([i]))
1480 self.assertEqual(s.count(c), N)
1481 finally:
1482 support.unlink(support.TESTFN)
1483
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001484 def test_unseekable(self):
1485 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1486 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1487 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1488 bufio.read(1)
1489 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1490 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1491
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001492 def test_misbehaved_io(self):
1493 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1494 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001495 self.assertRaises(OSError, bufio.seek, 0)
1496 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001497
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001498 def test_no_extraneous_read(self):
1499 # Issue #9550; when the raw IO object has satisfied the read request,
1500 # we should not issue any additional reads, otherwise it may block
1501 # (e.g. socket).
1502 bufsize = 16
1503 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1504 rawio = self.MockRawIO([b"x" * n])
1505 bufio = self.tp(rawio, bufsize)
1506 self.assertEqual(bufio.read(n), b"x" * n)
1507 # Simple case: one raw read is enough to satisfy the request.
1508 self.assertEqual(rawio._extraneous_reads, 0,
1509 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1510 # A more complex case where two raw reads are needed to satisfy
1511 # the request.
1512 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1513 bufio = self.tp(rawio, bufsize)
1514 self.assertEqual(bufio.read(n), b"x" * n)
1515 self.assertEqual(rawio._extraneous_reads, 0,
1516 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1517
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001518 def test_read_on_closed(self):
1519 # Issue #23796
1520 b = io.BufferedReader(io.BytesIO(b"12"))
1521 b.read(1)
1522 b.close()
1523 self.assertRaises(ValueError, b.peek)
1524 self.assertRaises(ValueError, b.read1, 1)
1525
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001526
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001527class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001528 tp = io.BufferedReader
1529
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001530 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1531 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001532 def test_constructor(self):
1533 BufferedReaderTest.test_constructor(self)
1534 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001535 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001536 if sys.maxsize > 0x7FFFFFFF:
1537 rawio = self.MockRawIO()
1538 bufio = self.tp(rawio)
1539 self.assertRaises((OverflowError, MemoryError, ValueError),
1540 bufio.__init__, rawio, sys.maxsize)
1541
1542 def test_initialization(self):
1543 rawio = self.MockRawIO([b"abc"])
1544 bufio = self.tp(rawio)
1545 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1546 self.assertRaises(ValueError, bufio.read)
1547 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1548 self.assertRaises(ValueError, bufio.read)
1549 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1550 self.assertRaises(ValueError, bufio.read)
1551
1552 def test_misbehaved_io_read(self):
1553 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1554 bufio = self.tp(rawio)
1555 # _pyio.BufferedReader seems to implement reading different, so that
1556 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001557 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001558
1559 def test_garbage_collection(self):
1560 # C BufferedReader objects are collected.
1561 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001562 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou796564c2013-07-30 19:59:21 +02001563 with support.check_warnings(('', ResourceWarning)):
1564 rawio = self.FileIO(support.TESTFN, "w+b")
1565 f = self.tp(rawio)
1566 f.f = f
1567 wr = weakref.ref(f)
1568 del f
1569 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001570 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001571
R David Murray67bfe802013-02-23 21:51:05 -05001572 def test_args_error(self):
1573 # Issue #17275
1574 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1575 self.tp(io.BytesIO(), 1024, 1024, 1024)
1576
1577
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001578class PyBufferedReaderTest(BufferedReaderTest):
1579 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001580
Guido van Rossuma9e20242007-03-08 00:43:48 +00001581
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001582class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1583 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001584
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001585 def test_constructor(self):
1586 rawio = self.MockRawIO()
1587 bufio = self.tp(rawio)
1588 bufio.__init__(rawio)
1589 bufio.__init__(rawio, buffer_size=1024)
1590 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001591 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001592 bufio.flush()
1593 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1594 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1595 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1596 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001597 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001598 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001599 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001600
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001601 def test_uninitialized(self):
1602 bufio = self.tp.__new__(self.tp)
1603 del bufio
1604 bufio = self.tp.__new__(self.tp)
1605 self.assertRaisesRegex((ValueError, AttributeError),
1606 'uninitialized|has no attribute',
1607 bufio.write, b'')
1608 bufio.__init__(self.MockRawIO())
1609 self.assertEqual(bufio.write(b''), 0)
1610
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001611 def test_detach_flush(self):
1612 raw = self.MockRawIO()
1613 buf = self.tp(raw)
1614 buf.write(b"howdy!")
1615 self.assertFalse(raw._write_stack)
1616 buf.detach()
1617 self.assertEqual(raw._write_stack, [b"howdy!"])
1618
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001619 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001620 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001621 writer = self.MockRawIO()
1622 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001623 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001624 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001625 buffer = bytearray(b"def")
1626 bufio.write(buffer)
1627 buffer[:] = b"***" # Overwrite our copy of the data
1628 bufio.flush()
1629 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001630
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001631 def test_write_overflow(self):
1632 writer = self.MockRawIO()
1633 bufio = self.tp(writer, 8)
1634 contents = b"abcdefghijklmnop"
1635 for n in range(0, len(contents), 3):
1636 bufio.write(contents[n:n+3])
1637 flushed = b"".join(writer._write_stack)
1638 # At least (total - 8) bytes were implicitly flushed, perhaps more
1639 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001640 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001641
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001642 def check_writes(self, intermediate_func):
1643 # Lots of writes, test the flushed output is as expected.
1644 contents = bytes(range(256)) * 1000
1645 n = 0
1646 writer = self.MockRawIO()
1647 bufio = self.tp(writer, 13)
1648 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1649 def gen_sizes():
1650 for size in count(1):
1651 for i in range(15):
1652 yield size
1653 sizes = gen_sizes()
1654 while n < len(contents):
1655 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001656 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001657 intermediate_func(bufio)
1658 n += size
1659 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001660 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001661
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001662 def test_writes(self):
1663 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001664
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001665 def test_writes_and_flushes(self):
1666 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001667
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001668 def test_writes_and_seeks(self):
1669 def _seekabs(bufio):
1670 pos = bufio.tell()
1671 bufio.seek(pos + 1, 0)
1672 bufio.seek(pos - 1, 0)
1673 bufio.seek(pos, 0)
1674 self.check_writes(_seekabs)
1675 def _seekrel(bufio):
1676 pos = bufio.seek(0, 1)
1677 bufio.seek(+1, 1)
1678 bufio.seek(-1, 1)
1679 bufio.seek(pos, 0)
1680 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001681
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001682 def test_writes_and_truncates(self):
1683 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001684
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001685 def test_write_non_blocking(self):
1686 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001687 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001688
Ezio Melottib3aedd42010-11-20 19:04:17 +00001689 self.assertEqual(bufio.write(b"abcd"), 4)
1690 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001691 # 1 byte will be written, the rest will be buffered
1692 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001693 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001694
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001695 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1696 raw.block_on(b"0")
1697 try:
1698 bufio.write(b"opqrwxyz0123456789")
1699 except self.BlockingIOError as e:
1700 written = e.characters_written
1701 else:
1702 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001703 self.assertEqual(written, 16)
1704 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001705 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001706
Ezio Melottib3aedd42010-11-20 19:04:17 +00001707 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001708 s = raw.pop_written()
1709 # Previously buffered bytes were flushed
1710 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001711
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001712 def test_write_and_rewind(self):
1713 raw = io.BytesIO()
1714 bufio = self.tp(raw, 4)
1715 self.assertEqual(bufio.write(b"abcdef"), 6)
1716 self.assertEqual(bufio.tell(), 6)
1717 bufio.seek(0, 0)
1718 self.assertEqual(bufio.write(b"XY"), 2)
1719 bufio.seek(6, 0)
1720 self.assertEqual(raw.getvalue(), b"XYcdef")
1721 self.assertEqual(bufio.write(b"123456"), 6)
1722 bufio.flush()
1723 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001724
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001725 def test_flush(self):
1726 writer = self.MockRawIO()
1727 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001728 bufio.write(b"abc")
1729 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001730 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001731
Antoine Pitrou131a4892012-10-16 22:57:11 +02001732 def test_writelines(self):
1733 l = [b'ab', b'cd', b'ef']
1734 writer = self.MockRawIO()
1735 bufio = self.tp(writer, 8)
1736 bufio.writelines(l)
1737 bufio.flush()
1738 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1739
1740 def test_writelines_userlist(self):
1741 l = UserList([b'ab', b'cd', b'ef'])
1742 writer = self.MockRawIO()
1743 bufio = self.tp(writer, 8)
1744 bufio.writelines(l)
1745 bufio.flush()
1746 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1747
1748 def test_writelines_error(self):
1749 writer = self.MockRawIO()
1750 bufio = self.tp(writer, 8)
1751 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1752 self.assertRaises(TypeError, bufio.writelines, None)
1753 self.assertRaises(TypeError, bufio.writelines, 'abc')
1754
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001755 def test_destructor(self):
1756 writer = self.MockRawIO()
1757 bufio = self.tp(writer, 8)
1758 bufio.write(b"abc")
1759 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001760 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001761 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001762
1763 def test_truncate(self):
1764 # Truncate implicitly flushes the buffer.
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001765 self.addCleanup(support.unlink, support.TESTFN)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001766 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001767 bufio = self.tp(raw, 8)
1768 bufio.write(b"abcdef")
1769 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001770 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001771 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001772 self.assertEqual(f.read(), b"abc")
1773
Nitish Chandra059f58c2018-01-28 21:30:09 +05301774 def test_truncate_after_write(self):
1775 # Ensure that truncate preserves the file position after
1776 # writes longer than the buffer size.
1777 # Issue: https://bugs.python.org/issue32228
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001778 self.addCleanup(support.unlink, support.TESTFN)
Nitish Chandra059f58c2018-01-28 21:30:09 +05301779 with self.open(support.TESTFN, "wb") as f:
1780 # Fill with some buffer
1781 f.write(b'\x00' * 10000)
1782 buffer_sizes = [8192, 4096, 200]
1783 for buffer_size in buffer_sizes:
1784 with self.open(support.TESTFN, "r+b", buffering=buffer_size) as f:
1785 f.write(b'\x00' * (buffer_size + 1))
1786 # After write write_pos and write_end are set to 0
1787 f.read(1)
1788 # read operation makes sure that pos != raw_pos
1789 f.truncate()
1790 self.assertEqual(f.tell(), buffer_size + 2)
1791
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001792 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001793 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001794 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001795 # Write out many bytes from many threads and test they were
1796 # all flushed.
1797 N = 1000
1798 contents = bytes(range(256)) * N
1799 sizes = cycle([1, 19])
1800 n = 0
1801 queue = deque()
1802 while n < len(contents):
1803 size = next(sizes)
1804 queue.append(contents[n:n+size])
1805 n += size
1806 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001807 # We use a real file object because it allows us to
1808 # exercise situations where the GIL is released before
1809 # writing the buffer to the raw streams. This is in addition
1810 # to concurrency issues due to switching threads in the middle
1811 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001812 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001813 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001814 errors = []
1815 def f():
1816 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001817 while True:
1818 try:
1819 s = queue.popleft()
1820 except IndexError:
1821 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001822 bufio.write(s)
1823 except Exception as e:
1824 errors.append(e)
1825 raise
1826 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001827 with support.start_threads(threads):
1828 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001829 self.assertFalse(errors,
1830 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001831 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001832 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001833 s = f.read()
1834 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001835 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001836 finally:
1837 support.unlink(support.TESTFN)
1838
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001839 def test_misbehaved_io(self):
1840 rawio = self.MisbehavedRawIO()
1841 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001842 self.assertRaises(OSError, bufio.seek, 0)
1843 self.assertRaises(OSError, bufio.tell)
1844 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001845
Florent Xicluna109d5732012-07-07 17:03:22 +02001846 def test_max_buffer_size_removal(self):
1847 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001848 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001849
Benjamin Peterson68623612012-12-20 11:53:11 -06001850 def test_write_error_on_close(self):
1851 raw = self.MockRawIO()
1852 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001853 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001854 raw.write = bad_write
1855 b = self.tp(raw)
1856 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001857 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001858 self.assertTrue(b.closed)
1859
benfogle9703f092017-11-10 16:03:40 -05001860 def test_slow_close_from_thread(self):
1861 # Issue #31976
1862 rawio = self.SlowFlushRawIO()
1863 bufio = self.tp(rawio, 8)
1864 t = threading.Thread(target=bufio.close)
1865 t.start()
1866 rawio.in_flush.wait()
1867 self.assertRaises(ValueError, bufio.write, b'spam')
1868 self.assertTrue(bufio.closed)
1869 t.join()
1870
1871
Benjamin Peterson59406a92009-03-26 17:10:29 +00001872
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001873class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001874 tp = io.BufferedWriter
1875
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001876 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1877 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001878 def test_constructor(self):
1879 BufferedWriterTest.test_constructor(self)
1880 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001881 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001882 if sys.maxsize > 0x7FFFFFFF:
1883 rawio = self.MockRawIO()
1884 bufio = self.tp(rawio)
1885 self.assertRaises((OverflowError, MemoryError, ValueError),
1886 bufio.__init__, rawio, sys.maxsize)
1887
1888 def test_initialization(self):
1889 rawio = self.MockRawIO()
1890 bufio = self.tp(rawio)
1891 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1892 self.assertRaises(ValueError, bufio.write, b"def")
1893 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1894 self.assertRaises(ValueError, bufio.write, b"def")
1895 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1896 self.assertRaises(ValueError, bufio.write, b"def")
1897
1898 def test_garbage_collection(self):
1899 # C BufferedWriter objects are collected, and collecting them flushes
1900 # all data to disk.
1901 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001902 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou796564c2013-07-30 19:59:21 +02001903 with support.check_warnings(('', ResourceWarning)):
1904 rawio = self.FileIO(support.TESTFN, "w+b")
1905 f = self.tp(rawio)
1906 f.write(b"123xxx")
1907 f.x = f
1908 wr = weakref.ref(f)
1909 del f
1910 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001911 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001912 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001913 self.assertEqual(f.read(), b"123xxx")
1914
R David Murray67bfe802013-02-23 21:51:05 -05001915 def test_args_error(self):
1916 # Issue #17275
1917 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1918 self.tp(io.BytesIO(), 1024, 1024, 1024)
1919
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001920
1921class PyBufferedWriterTest(BufferedWriterTest):
1922 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001923
Guido van Rossum01a27522007-03-07 01:00:12 +00001924class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001925
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001926 def test_constructor(self):
1927 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001928 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001929
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001930 def test_uninitialized(self):
1931 pair = self.tp.__new__(self.tp)
1932 del pair
1933 pair = self.tp.__new__(self.tp)
1934 self.assertRaisesRegex((ValueError, AttributeError),
1935 'uninitialized|has no attribute',
1936 pair.read, 0)
1937 self.assertRaisesRegex((ValueError, AttributeError),
1938 'uninitialized|has no attribute',
1939 pair.write, b'')
1940 pair.__init__(self.MockRawIO(), self.MockRawIO())
1941 self.assertEqual(pair.read(0), b'')
1942 self.assertEqual(pair.write(b''), 0)
1943
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001944 def test_detach(self):
1945 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1946 self.assertRaises(self.UnsupportedOperation, pair.detach)
1947
Florent Xicluna109d5732012-07-07 17:03:22 +02001948 def test_constructor_max_buffer_size_removal(self):
1949 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001950 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001951
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001952 def test_constructor_with_not_readable(self):
1953 class NotReadable(MockRawIO):
1954 def readable(self):
1955 return False
1956
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001957 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001958
1959 def test_constructor_with_not_writeable(self):
1960 class NotWriteable(MockRawIO):
1961 def writable(self):
1962 return False
1963
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001964 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001965
1966 def test_read(self):
1967 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1968
1969 self.assertEqual(pair.read(3), b"abc")
1970 self.assertEqual(pair.read(1), b"d")
1971 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001972 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1973 self.assertEqual(pair.read(None), b"abc")
1974
1975 def test_readlines(self):
1976 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1977 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1978 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1979 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001980
1981 def test_read1(self):
1982 # .read1() is delegated to the underlying reader object, so this test
1983 # can be shallow.
1984 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1985
1986 self.assertEqual(pair.read1(3), b"abc")
Martin Panterccb2c0e2016-10-20 23:48:14 +00001987 self.assertEqual(pair.read1(), b"def")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001988
1989 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00001990 for method in ("readinto", "readinto1"):
1991 with self.subTest(method):
1992 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001993
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03001994 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00001995 self.assertEqual(getattr(pair, method)(data), 5)
1996 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001997
1998 def test_write(self):
1999 w = self.MockRawIO()
2000 pair = self.tp(self.MockRawIO(), w)
2001
2002 pair.write(b"abc")
2003 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00002004 buffer = bytearray(b"def")
2005 pair.write(buffer)
2006 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002007 pair.flush()
2008 self.assertEqual(w._write_stack, [b"abc", b"def"])
2009
2010 def test_peek(self):
2011 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2012
2013 self.assertTrue(pair.peek(3).startswith(b"abc"))
2014 self.assertEqual(pair.read(3), b"abc")
2015
2016 def test_readable(self):
2017 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2018 self.assertTrue(pair.readable())
2019
2020 def test_writeable(self):
2021 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2022 self.assertTrue(pair.writable())
2023
2024 def test_seekable(self):
2025 # BufferedRWPairs are never seekable, even if their readers and writers
2026 # are.
2027 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2028 self.assertFalse(pair.seekable())
2029
2030 # .flush() is delegated to the underlying writer object and has been
2031 # tested in the test_write method.
2032
2033 def test_close_and_closed(self):
2034 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2035 self.assertFalse(pair.closed)
2036 pair.close()
2037 self.assertTrue(pair.closed)
2038
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002039 def test_reader_close_error_on_close(self):
2040 def reader_close():
2041 reader_non_existing
2042 reader = self.MockRawIO()
2043 reader.close = reader_close
2044 writer = self.MockRawIO()
2045 pair = self.tp(reader, writer)
2046 with self.assertRaises(NameError) as err:
2047 pair.close()
2048 self.assertIn('reader_non_existing', str(err.exception))
2049 self.assertTrue(pair.closed)
2050 self.assertFalse(reader.closed)
2051 self.assertTrue(writer.closed)
2052
Victor Stinner472f7942019-04-12 21:58:24 +02002053 # Silence destructor error
2054 reader.close = lambda: None
2055
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002056 def test_writer_close_error_on_close(self):
2057 def writer_close():
2058 writer_non_existing
2059 reader = self.MockRawIO()
2060 writer = self.MockRawIO()
2061 writer.close = writer_close
2062 pair = self.tp(reader, writer)
2063 with self.assertRaises(NameError) as err:
2064 pair.close()
2065 self.assertIn('writer_non_existing', str(err.exception))
2066 self.assertFalse(pair.closed)
2067 self.assertTrue(reader.closed)
2068 self.assertFalse(writer.closed)
2069
Victor Stinner472f7942019-04-12 21:58:24 +02002070 # Silence destructor error
2071 writer.close = lambda: None
2072
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002073 def test_reader_writer_close_error_on_close(self):
2074 def reader_close():
2075 reader_non_existing
2076 def writer_close():
2077 writer_non_existing
2078 reader = self.MockRawIO()
2079 reader.close = reader_close
2080 writer = self.MockRawIO()
2081 writer.close = writer_close
2082 pair = self.tp(reader, writer)
2083 with self.assertRaises(NameError) as err:
2084 pair.close()
2085 self.assertIn('reader_non_existing', str(err.exception))
2086 self.assertIsInstance(err.exception.__context__, NameError)
2087 self.assertIn('writer_non_existing', str(err.exception.__context__))
2088 self.assertFalse(pair.closed)
2089 self.assertFalse(reader.closed)
2090 self.assertFalse(writer.closed)
2091
Victor Stinner472f7942019-04-12 21:58:24 +02002092 # Silence destructor error
2093 reader.close = lambda: None
2094 writer.close = lambda: None
2095
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002096 def test_isatty(self):
2097 class SelectableIsAtty(MockRawIO):
2098 def __init__(self, isatty):
2099 MockRawIO.__init__(self)
2100 self._isatty = isatty
2101
2102 def isatty(self):
2103 return self._isatty
2104
2105 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2106 self.assertFalse(pair.isatty())
2107
2108 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2109 self.assertTrue(pair.isatty())
2110
2111 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2112 self.assertTrue(pair.isatty())
2113
2114 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2115 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00002116
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04002117 def test_weakref_clearing(self):
2118 brw = self.tp(self.MockRawIO(), self.MockRawIO())
2119 ref = weakref.ref(brw)
2120 brw = None
2121 ref = None # Shouldn't segfault.
2122
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002123class CBufferedRWPairTest(BufferedRWPairTest):
2124 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002125
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002126class PyBufferedRWPairTest(BufferedRWPairTest):
2127 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002128
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002129
2130class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2131 read_mode = "rb+"
2132 write_mode = "wb+"
2133
2134 def test_constructor(self):
2135 BufferedReaderTest.test_constructor(self)
2136 BufferedWriterTest.test_constructor(self)
2137
Serhiy Storchaka61e24932014-02-12 10:52:35 +02002138 def test_uninitialized(self):
2139 BufferedReaderTest.test_uninitialized(self)
2140 BufferedWriterTest.test_uninitialized(self)
2141
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002142 def test_read_and_write(self):
2143 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00002144 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00002145
2146 self.assertEqual(b"as", rw.read(2))
2147 rw.write(b"ddd")
2148 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002149 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002150 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002151 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00002152
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002153 def test_seek_and_tell(self):
2154 raw = self.BytesIO(b"asdfghjkl")
2155 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00002156
Ezio Melottib3aedd42010-11-20 19:04:17 +00002157 self.assertEqual(b"as", rw.read(2))
2158 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002159 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002160 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00002161
Antoine Pitroue05565e2011-08-20 14:39:23 +02002162 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00002163 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02002164 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002165 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002166 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002167 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002168 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002169 self.assertEqual(7, rw.tell())
2170 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02002171 rw.flush()
2172 self.assertEqual(b"asdf123fl", raw.getvalue())
2173
Christian Heimes8e42a0a2007-11-08 18:04:45 +00002174 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002175
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002176 def check_flush_and_read(self, read_func):
2177 raw = self.BytesIO(b"abcdefghi")
2178 bufio = self.tp(raw)
2179
Ezio Melottib3aedd42010-11-20 19:04:17 +00002180 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002181 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002182 self.assertEqual(b"ef", read_func(bufio, 2))
2183 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002184 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002185 self.assertEqual(6, bufio.tell())
2186 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002187 raw.seek(0, 0)
2188 raw.write(b"XYZ")
2189 # flush() resets the read buffer
2190 bufio.flush()
2191 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002192 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002193
2194 def test_flush_and_read(self):
2195 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2196
2197 def test_flush_and_readinto(self):
2198 def _readinto(bufio, n=-1):
2199 b = bytearray(n if n >= 0 else 9999)
2200 n = bufio.readinto(b)
2201 return bytes(b[:n])
2202 self.check_flush_and_read(_readinto)
2203
2204 def test_flush_and_peek(self):
2205 def _peek(bufio, n=-1):
2206 # This relies on the fact that the buffer can contain the whole
2207 # raw stream, otherwise peek() can return less.
2208 b = bufio.peek(n)
2209 if n != -1:
2210 b = b[:n]
2211 bufio.seek(len(b), 1)
2212 return b
2213 self.check_flush_and_read(_peek)
2214
2215 def test_flush_and_write(self):
2216 raw = self.BytesIO(b"abcdefghi")
2217 bufio = self.tp(raw)
2218
2219 bufio.write(b"123")
2220 bufio.flush()
2221 bufio.write(b"45")
2222 bufio.flush()
2223 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002224 self.assertEqual(b"12345fghi", raw.getvalue())
2225 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002226
2227 def test_threads(self):
2228 BufferedReaderTest.test_threads(self)
2229 BufferedWriterTest.test_threads(self)
2230
2231 def test_writes_and_peek(self):
2232 def _peek(bufio):
2233 bufio.peek(1)
2234 self.check_writes(_peek)
2235 def _peek(bufio):
2236 pos = bufio.tell()
2237 bufio.seek(-1, 1)
2238 bufio.peek(1)
2239 bufio.seek(pos, 0)
2240 self.check_writes(_peek)
2241
2242 def test_writes_and_reads(self):
2243 def _read(bufio):
2244 bufio.seek(-1, 1)
2245 bufio.read(1)
2246 self.check_writes(_read)
2247
2248 def test_writes_and_read1s(self):
2249 def _read1(bufio):
2250 bufio.seek(-1, 1)
2251 bufio.read1(1)
2252 self.check_writes(_read1)
2253
2254 def test_writes_and_readintos(self):
2255 def _read(bufio):
2256 bufio.seek(-1, 1)
2257 bufio.readinto(bytearray(1))
2258 self.check_writes(_read)
2259
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002260 def test_write_after_readahead(self):
2261 # Issue #6629: writing after the buffer was filled by readahead should
2262 # first rewind the raw stream.
2263 for overwrite_size in [1, 5]:
2264 raw = self.BytesIO(b"A" * 10)
2265 bufio = self.tp(raw, 4)
2266 # Trigger readahead
2267 self.assertEqual(bufio.read(1), b"A")
2268 self.assertEqual(bufio.tell(), 1)
2269 # Overwriting should rewind the raw stream if it needs so
2270 bufio.write(b"B" * overwrite_size)
2271 self.assertEqual(bufio.tell(), overwrite_size + 1)
2272 # If the write size was smaller than the buffer size, flush() and
2273 # check that rewind happens.
2274 bufio.flush()
2275 self.assertEqual(bufio.tell(), overwrite_size + 1)
2276 s = raw.getvalue()
2277 self.assertEqual(s,
2278 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2279
Antoine Pitrou7c404892011-05-13 00:13:33 +02002280 def test_write_rewind_write(self):
2281 # Various combinations of reading / writing / seeking backwards / writing again
2282 def mutate(bufio, pos1, pos2):
2283 assert pos2 >= pos1
2284 # Fill the buffer
2285 bufio.seek(pos1)
2286 bufio.read(pos2 - pos1)
2287 bufio.write(b'\x02')
2288 # This writes earlier than the previous write, but still inside
2289 # the buffer.
2290 bufio.seek(pos1)
2291 bufio.write(b'\x01')
2292
2293 b = b"\x80\x81\x82\x83\x84"
2294 for i in range(0, len(b)):
2295 for j in range(i, len(b)):
2296 raw = self.BytesIO(b)
2297 bufio = self.tp(raw, 100)
2298 mutate(bufio, i, j)
2299 bufio.flush()
2300 expected = bytearray(b)
2301 expected[j] = 2
2302 expected[i] = 1
2303 self.assertEqual(raw.getvalue(), expected,
2304 "failed result for i=%d, j=%d" % (i, j))
2305
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002306 def test_truncate_after_read_or_write(self):
2307 raw = self.BytesIO(b"A" * 10)
2308 bufio = self.tp(raw, 100)
2309 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2310 self.assertEqual(bufio.truncate(), 2)
2311 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2312 self.assertEqual(bufio.truncate(), 4)
2313
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002314 def test_misbehaved_io(self):
2315 BufferedReaderTest.test_misbehaved_io(self)
2316 BufferedWriterTest.test_misbehaved_io(self)
2317
Antoine Pitroue05565e2011-08-20 14:39:23 +02002318 def test_interleaved_read_write(self):
2319 # Test for issue #12213
2320 with self.BytesIO(b'abcdefgh') as raw:
2321 with self.tp(raw, 100) as f:
2322 f.write(b"1")
2323 self.assertEqual(f.read(1), b'b')
2324 f.write(b'2')
2325 self.assertEqual(f.read1(1), b'd')
2326 f.write(b'3')
2327 buf = bytearray(1)
2328 f.readinto(buf)
2329 self.assertEqual(buf, b'f')
2330 f.write(b'4')
2331 self.assertEqual(f.peek(1), b'h')
2332 f.flush()
2333 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2334
2335 with self.BytesIO(b'abc') as raw:
2336 with self.tp(raw, 100) as f:
2337 self.assertEqual(f.read(1), b'a')
2338 f.write(b"2")
2339 self.assertEqual(f.read(1), b'c')
2340 f.flush()
2341 self.assertEqual(raw.getvalue(), b'a2c')
2342
2343 def test_interleaved_readline_write(self):
2344 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2345 with self.tp(raw) as f:
2346 f.write(b'1')
2347 self.assertEqual(f.readline(), b'b\n')
2348 f.write(b'2')
2349 self.assertEqual(f.readline(), b'def\n')
2350 f.write(b'3')
2351 self.assertEqual(f.readline(), b'\n')
2352 f.flush()
2353 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2354
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002355 # You can't construct a BufferedRandom over a non-seekable stream.
2356 test_unseekable = None
2357
R David Murray67bfe802013-02-23 21:51:05 -05002358
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002359class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002360 tp = io.BufferedRandom
2361
Gregory P. Smithe5796c42018-12-30 20:17:57 -08002362 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
2363 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002364 def test_constructor(self):
2365 BufferedRandomTest.test_constructor(self)
2366 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08002367 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002368 if sys.maxsize > 0x7FFFFFFF:
2369 rawio = self.MockRawIO()
2370 bufio = self.tp(rawio)
2371 self.assertRaises((OverflowError, MemoryError, ValueError),
2372 bufio.__init__, rawio, sys.maxsize)
2373
2374 def test_garbage_collection(self):
2375 CBufferedReaderTest.test_garbage_collection(self)
2376 CBufferedWriterTest.test_garbage_collection(self)
2377
R David Murray67bfe802013-02-23 21:51:05 -05002378 def test_args_error(self):
2379 # Issue #17275
2380 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2381 self.tp(io.BytesIO(), 1024, 1024, 1024)
2382
2383
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002384class PyBufferedRandomTest(BufferedRandomTest):
2385 tp = pyio.BufferedRandom
2386
2387
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002388# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2389# properties:
2390# - A single output character can correspond to many bytes of input.
2391# - The number of input bytes to complete the character can be
2392# undetermined until the last input byte is received.
2393# - The number of input bytes can vary depending on previous input.
2394# - A single input byte can correspond to many characters of output.
2395# - The number of output characters can be undetermined until the
2396# last input byte is received.
2397# - The number of output characters can vary depending on previous input.
2398
2399class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2400 """
2401 For testing seek/tell behavior with a stateful, buffering decoder.
2402
2403 Input is a sequence of words. Words may be fixed-length (length set
2404 by input) or variable-length (period-terminated). In variable-length
2405 mode, extra periods are ignored. Possible words are:
2406 - 'i' followed by a number sets the input length, I (maximum 99).
2407 When I is set to 0, words are space-terminated.
2408 - 'o' followed by a number sets the output length, O (maximum 99).
2409 - Any other word is converted into a word followed by a period on
2410 the output. The output word consists of the input word truncated
2411 or padded out with hyphens to make its length equal to O. If O
2412 is 0, the word is output verbatim without truncating or padding.
2413 I and O are initially set to 1. When I changes, any buffered input is
2414 re-scanned according to the new I. EOF also terminates the last word.
2415 """
2416
2417 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002418 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002419 self.reset()
2420
2421 def __repr__(self):
2422 return '<SID %x>' % id(self)
2423
2424 def reset(self):
2425 self.i = 1
2426 self.o = 1
2427 self.buffer = bytearray()
2428
2429 def getstate(self):
2430 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2431 return bytes(self.buffer), i*100 + o
2432
2433 def setstate(self, state):
2434 buffer, io = state
2435 self.buffer = bytearray(buffer)
2436 i, o = divmod(io, 100)
2437 self.i, self.o = i ^ 1, o ^ 1
2438
2439 def decode(self, input, final=False):
2440 output = ''
2441 for b in input:
2442 if self.i == 0: # variable-length, terminated with period
2443 if b == ord('.'):
2444 if self.buffer:
2445 output += self.process_word()
2446 else:
2447 self.buffer.append(b)
2448 else: # fixed-length, terminate after self.i bytes
2449 self.buffer.append(b)
2450 if len(self.buffer) == self.i:
2451 output += self.process_word()
2452 if final and self.buffer: # EOF terminates the last word
2453 output += self.process_word()
2454 return output
2455
2456 def process_word(self):
2457 output = ''
2458 if self.buffer[0] == ord('i'):
2459 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2460 elif self.buffer[0] == ord('o'):
2461 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2462 else:
2463 output = self.buffer.decode('ascii')
2464 if len(output) < self.o:
2465 output += '-'*self.o # pad out with hyphens
2466 if self.o:
2467 output = output[:self.o] # truncate to output length
2468 output += '.'
2469 self.buffer = bytearray()
2470 return output
2471
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002472 codecEnabled = False
2473
2474 @classmethod
2475 def lookupTestDecoder(cls, name):
2476 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002477 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002478 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002479 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002480 incrementalencoder=None,
2481 streamreader=None, streamwriter=None,
2482 incrementaldecoder=cls)
2483
2484# Register the previous decoder for testing.
2485# Disabled by default, tests will enable it.
2486codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2487
2488
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002489class StatefulIncrementalDecoderTest(unittest.TestCase):
2490 """
2491 Make sure the StatefulIncrementalDecoder actually works.
2492 """
2493
2494 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002495 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002496 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002497 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002498 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002499 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002500 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002501 # I=0, O=6 (variable-length input, fixed-length output)
2502 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2503 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002504 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002505 # I=6, O=3 (fixed-length input > fixed-length output)
2506 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2507 # I=0, then 3; O=29, then 15 (with longer output)
2508 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2509 'a----------------------------.' +
2510 'b----------------------------.' +
2511 'cde--------------------------.' +
2512 'abcdefghijabcde.' +
2513 'a.b------------.' +
2514 '.c.------------.' +
2515 'd.e------------.' +
2516 'k--------------.' +
2517 'l--------------.' +
2518 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002519 ]
2520
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002521 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002522 # Try a few one-shot test cases.
2523 for input, eof, output in self.test_cases:
2524 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002525 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002526
2527 # Also test an unfinished decode, followed by forcing EOF.
2528 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002529 self.assertEqual(d.decode(b'oiabcd'), '')
2530 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002531
2532class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002533
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002534 def setUp(self):
2535 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2536 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002537 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002538
Guido van Rossumd0712812007-04-11 16:32:43 +00002539 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002540 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002541
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002542 def test_constructor(self):
2543 r = self.BytesIO(b"\xc3\xa9\n\n")
2544 b = self.BufferedReader(r, 1000)
2545 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002546 t.__init__(b, encoding="latin-1", newline="\r\n")
2547 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002548 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002549 t.__init__(b, encoding="utf-8", line_buffering=True)
2550 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002551 self.assertEqual(t.line_buffering, True)
2552 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002553 self.assertRaises(TypeError, t.__init__, b, newline=42)
2554 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2555
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002556 def test_uninitialized(self):
2557 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2558 del t
2559 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2560 self.assertRaises(Exception, repr, t)
2561 self.assertRaisesRegex((ValueError, AttributeError),
2562 'uninitialized|has no attribute',
2563 t.read, 0)
2564 t.__init__(self.MockRawIO())
2565 self.assertEqual(t.read(0), '')
2566
Nick Coghlana9b15242014-02-04 22:11:18 +10002567 def test_non_text_encoding_codecs_are_rejected(self):
2568 # Ensure the constructor complains if passed a codec that isn't
2569 # marked as a text encoding
2570 # http://bugs.python.org/issue20404
2571 r = self.BytesIO()
2572 b = self.BufferedWriter(r)
2573 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2574 self.TextIOWrapper(b, encoding="hex")
2575
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002576 def test_detach(self):
2577 r = self.BytesIO()
2578 b = self.BufferedWriter(r)
2579 t = self.TextIOWrapper(b)
2580 self.assertIs(t.detach(), b)
2581
2582 t = self.TextIOWrapper(b, encoding="ascii")
2583 t.write("howdy")
2584 self.assertFalse(r.getvalue())
2585 t.detach()
2586 self.assertEqual(r.getvalue(), b"howdy")
2587 self.assertRaises(ValueError, t.detach)
2588
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002589 # Operations independent of the detached stream should still work
2590 repr(t)
2591 self.assertEqual(t.encoding, "ascii")
2592 self.assertEqual(t.errors, "strict")
2593 self.assertFalse(t.line_buffering)
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002594 self.assertFalse(t.write_through)
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002595
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002596 def test_repr(self):
2597 raw = self.BytesIO("hello".encode("utf-8"))
2598 b = self.BufferedReader(raw)
2599 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002600 modname = self.TextIOWrapper.__module__
2601 self.assertEqual(repr(t),
2602 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2603 raw.name = "dummy"
2604 self.assertEqual(repr(t),
2605 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002606 t.mode = "r"
2607 self.assertEqual(repr(t),
2608 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002609 raw.name = b"dummy"
2610 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002611 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002612
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002613 t.buffer.detach()
2614 repr(t) # Should not raise an exception
2615
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002616 def test_recursive_repr(self):
2617 # Issue #25455
2618 raw = self.BytesIO()
2619 t = self.TextIOWrapper(raw)
2620 with support.swap_attr(raw, 'name', t):
2621 try:
2622 repr(t) # Should not crash
2623 except RuntimeError:
2624 pass
2625
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002626 def test_line_buffering(self):
2627 r = self.BytesIO()
2628 b = self.BufferedWriter(r, 1000)
2629 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002630 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002631 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002632 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002633 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002634 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002635 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002636
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002637 def test_reconfigure_line_buffering(self):
2638 r = self.BytesIO()
2639 b = self.BufferedWriter(r, 1000)
2640 t = self.TextIOWrapper(b, newline="\n", line_buffering=False)
2641 t.write("AB\nC")
2642 self.assertEqual(r.getvalue(), b"")
2643
2644 t.reconfigure(line_buffering=True) # implicit flush
2645 self.assertEqual(r.getvalue(), b"AB\nC")
2646 t.write("DEF\nG")
2647 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2648 t.write("H")
2649 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2650 t.reconfigure(line_buffering=False) # implicit flush
2651 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2652 t.write("IJ")
2653 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2654
2655 # Keeping default value
2656 t.reconfigure()
2657 t.reconfigure(line_buffering=None)
2658 self.assertEqual(t.line_buffering, False)
2659 t.reconfigure(line_buffering=True)
2660 t.reconfigure()
2661 t.reconfigure(line_buffering=None)
2662 self.assertEqual(t.line_buffering, True)
2663
Victor Stinner91106cd2017-12-13 12:29:09 +01002664 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002665 def test_default_encoding(self):
2666 old_environ = dict(os.environ)
2667 try:
2668 # try to get a user preferred encoding different than the current
2669 # locale encoding to check that TextIOWrapper() uses the current
2670 # locale encoding and not the user preferred encoding
2671 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2672 if key in os.environ:
2673 del os.environ[key]
2674
2675 current_locale_encoding = locale.getpreferredencoding(False)
2676 b = self.BytesIO()
2677 t = self.TextIOWrapper(b)
2678 self.assertEqual(t.encoding, current_locale_encoding)
2679 finally:
2680 os.environ.clear()
2681 os.environ.update(old_environ)
2682
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002683 @support.cpython_only
Victor Stinner91106cd2017-12-13 12:29:09 +01002684 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Serhiy Storchaka78980432013-01-15 01:12:17 +02002685 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002686 # Issue 15989
2687 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002688 b = self.BytesIO()
2689 b.fileno = lambda: _testcapi.INT_MAX + 1
2690 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2691 b.fileno = lambda: _testcapi.UINT_MAX + 1
2692 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2693
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002694 def test_encoding(self):
2695 # Check the encoding attribute is always set, and valid
2696 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002697 t = self.TextIOWrapper(b, encoding="utf-8")
2698 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002699 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002700 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002701 codecs.lookup(t.encoding)
2702
2703 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002704 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002705 b = self.BytesIO(b"abc\n\xff\n")
2706 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002707 self.assertRaises(UnicodeError, t.read)
2708 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002709 b = self.BytesIO(b"abc\n\xff\n")
2710 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002711 self.assertRaises(UnicodeError, t.read)
2712 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002713 b = self.BytesIO(b"abc\n\xff\n")
2714 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002715 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002716 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002717 b = self.BytesIO(b"abc\n\xff\n")
2718 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002719 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002720
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002721 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002722 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002723 b = self.BytesIO()
2724 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002725 self.assertRaises(UnicodeError, t.write, "\xff")
2726 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002727 b = self.BytesIO()
2728 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002729 self.assertRaises(UnicodeError, t.write, "\xff")
2730 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002731 b = self.BytesIO()
2732 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002733 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002734 t.write("abc\xffdef\n")
2735 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002736 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002737 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002738 b = self.BytesIO()
2739 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002740 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002741 t.write("abc\xffdef\n")
2742 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002743 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002744
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002745 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002746 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2747
2748 tests = [
2749 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002750 [ '', input_lines ],
2751 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2752 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2753 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002754 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002755 encodings = (
2756 'utf-8', 'latin-1',
2757 'utf-16', 'utf-16-le', 'utf-16-be',
2758 'utf-32', 'utf-32-le', 'utf-32-be',
2759 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002760
Guido van Rossum8358db22007-08-18 21:39:55 +00002761 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002762 # character in TextIOWrapper._pending_line.
2763 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002764 # XXX: str.encode() should return bytes
2765 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002766 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002767 for bufsize in range(1, 10):
2768 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002769 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2770 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002771 encoding=encoding)
2772 if do_reads:
2773 got_lines = []
2774 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002775 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002776 if c2 == '':
2777 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002778 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002779 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002780 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002781 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002782
2783 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002784 self.assertEqual(got_line, exp_line)
2785 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002786
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002787 def test_newlines_input(self):
2788 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002789 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2790 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002791 (None, normalized.decode("ascii").splitlines(keepends=True)),
2792 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002793 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2794 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2795 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002796 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002797 buf = self.BytesIO(testdata)
2798 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002799 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002800 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002801 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002802
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002803 def test_newlines_output(self):
2804 testdict = {
2805 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2806 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2807 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2808 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2809 }
2810 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2811 for newline, expected in tests:
2812 buf = self.BytesIO()
2813 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2814 txt.write("AAA\nB")
2815 txt.write("BB\nCCC\n")
2816 txt.write("X\rY\r\nZ")
2817 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002818 self.assertEqual(buf.closed, False)
2819 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002820
2821 def test_destructor(self):
2822 l = []
2823 base = self.BytesIO
2824 class MyBytesIO(base):
2825 def close(self):
2826 l.append(self.getvalue())
2827 base.close(self)
2828 b = MyBytesIO()
2829 t = self.TextIOWrapper(b, encoding="ascii")
2830 t.write("abc")
2831 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002832 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002833 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002834
2835 def test_override_destructor(self):
2836 record = []
2837 class MyTextIO(self.TextIOWrapper):
2838 def __del__(self):
2839 record.append(1)
2840 try:
2841 f = super().__del__
2842 except AttributeError:
2843 pass
2844 else:
2845 f()
2846 def close(self):
2847 record.append(2)
2848 super().close()
2849 def flush(self):
2850 record.append(3)
2851 super().flush()
2852 b = self.BytesIO()
2853 t = MyTextIO(b, encoding="ascii")
2854 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002855 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002856 self.assertEqual(record, [1, 2, 3])
2857
2858 def test_error_through_destructor(self):
2859 # Test that the exception state is not modified by a destructor,
2860 # even if close() fails.
2861 rawio = self.CloseFailureIO()
2862 def f():
2863 self.TextIOWrapper(rawio).xyzzy
2864 with support.captured_output("stderr") as s:
2865 self.assertRaises(AttributeError, f)
2866 s = s.getvalue().strip()
2867 if s:
2868 # The destructor *may* have printed an unraisable error, check it
Victor Stinner44235042019-04-12 17:06:47 +02002869 lines = s.splitlines()
2870 if DESTRUCTOR_LOG_ERRORS:
2871 self.assertEqual(len(lines), 5)
2872 self.assertTrue(lines[0].startswith("Exception ignored in: "), lines)
2873 self.assertEqual(lines[1], "Traceback (most recent call last):", lines)
2874 self.assertEqual(lines[4], 'OSError:', lines)
2875 else:
2876 self.assertEqual(len(lines), 1)
2877 self.assertTrue(lines[-1].startswith("Exception OSError: "), lines)
2878 self.assertTrue(lines[-1].endswith(" ignored"), lines)
Guido van Rossum8358db22007-08-18 21:39:55 +00002879
Guido van Rossum9b76da62007-04-11 01:09:03 +00002880 # Systematic tests of the text I/O API
2881
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002882 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002883 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002884 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002885 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002886 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002887 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002888 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002889 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002890 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002891 self.assertEqual(f.tell(), 0)
2892 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002893 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002894 self.assertEqual(f.seek(0), 0)
2895 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002896 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002897 self.assertEqual(f.read(2), "ab")
2898 self.assertEqual(f.read(1), "c")
2899 self.assertEqual(f.read(1), "")
2900 self.assertEqual(f.read(), "")
2901 self.assertEqual(f.tell(), cookie)
2902 self.assertEqual(f.seek(0), 0)
2903 self.assertEqual(f.seek(0, 2), cookie)
2904 self.assertEqual(f.write("def"), 3)
2905 self.assertEqual(f.seek(cookie), cookie)
2906 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002907 if enc.startswith("utf"):
2908 self.multi_line_test(f, enc)
2909 f.close()
2910
2911 def multi_line_test(self, f, enc):
2912 f.seek(0)
2913 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002914 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002915 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002916 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002917 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002918 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002919 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002920 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002921 wlines.append((f.tell(), line))
2922 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002923 f.seek(0)
2924 rlines = []
2925 while True:
2926 pos = f.tell()
2927 line = f.readline()
2928 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002929 break
2930 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002931 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002932
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002933 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002934 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002935 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002936 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002937 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002938 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002939 p2 = f.tell()
2940 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002941 self.assertEqual(f.tell(), p0)
2942 self.assertEqual(f.readline(), "\xff\n")
2943 self.assertEqual(f.tell(), p1)
2944 self.assertEqual(f.readline(), "\xff\n")
2945 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002946 f.seek(0)
2947 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002948 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002949 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002950 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002951 f.close()
2952
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002953 def test_seeking(self):
2954 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002955 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002956 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002957 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002958 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002959 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002960 suffix = bytes(u_suffix.encode("utf-8"))
2961 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002962 with self.open(support.TESTFN, "wb") as f:
2963 f.write(line*2)
2964 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2965 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002966 self.assertEqual(s, str(prefix, "ascii"))
2967 self.assertEqual(f.tell(), prefix_size)
2968 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002969
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002970 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002971 # Regression test for a specific bug
2972 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002973 with self.open(support.TESTFN, "wb") as f:
2974 f.write(data)
2975 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2976 f._CHUNK_SIZE # Just test that it exists
2977 f._CHUNK_SIZE = 2
2978 f.readline()
2979 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002980
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002981 def test_seek_and_tell(self):
2982 #Test seek/tell using the StatefulIncrementalDecoder.
2983 # Make test faster by doing smaller seeks
2984 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002985
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002986 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002987 """Tell/seek to various points within a data stream and ensure
2988 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002989 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002990 f.write(data)
2991 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002992 f = self.open(support.TESTFN, encoding='test_decoder')
2993 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002994 decoded = f.read()
2995 f.close()
2996
Neal Norwitze2b07052008-03-18 19:52:05 +00002997 for i in range(min_pos, len(decoded) + 1): # seek positions
2998 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002999 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00003000 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003001 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003002 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003003 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003004 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003005 f.close()
3006
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003007 # Enable the test decoder.
3008 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003009
3010 # Run the tests.
3011 try:
3012 # Try each test case.
3013 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003014 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003015
3016 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003017 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
3018 offset = CHUNK_SIZE - len(input)//2
3019 prefix = b'.'*offset
3020 # Don't bother seeking into the prefix (takes too long).
3021 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003022 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003023
3024 # Ensure our test decoder won't interfere with subsequent tests.
3025 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003026 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003027
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003028 def test_multibyte_seek_and_tell(self):
3029 f = self.open(support.TESTFN, "w", encoding="euc_jp")
3030 f.write("AB\n\u3046\u3048\n")
3031 f.close()
3032
3033 f = self.open(support.TESTFN, "r", encoding="euc_jp")
3034 self.assertEqual(f.readline(), "AB\n")
3035 p0 = f.tell()
3036 self.assertEqual(f.readline(), "\u3046\u3048\n")
3037 p1 = f.tell()
3038 f.seek(p0)
3039 self.assertEqual(f.readline(), "\u3046\u3048\n")
3040 self.assertEqual(f.tell(), p1)
3041 f.close()
3042
3043 def test_seek_with_encoder_state(self):
3044 f = self.open(support.TESTFN, "w", encoding="euc_jis_2004")
3045 f.write("\u00e6\u0300")
3046 p0 = f.tell()
3047 f.write("\u00e6")
3048 f.seek(p0)
3049 f.write("\u0300")
3050 f.close()
3051
3052 f = self.open(support.TESTFN, "r", encoding="euc_jis_2004")
3053 self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
3054 f.close()
3055
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003056 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003057 data = "1234567890"
3058 tests = ("utf-16",
3059 "utf-16-le",
3060 "utf-16-be",
3061 "utf-32",
3062 "utf-32-le",
3063 "utf-32-be")
3064 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003065 buf = self.BytesIO()
3066 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003067 # Check if the BOM is written only once (see issue1753).
3068 f.write(data)
3069 f.write(data)
3070 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003071 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00003072 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003073 self.assertEqual(f.read(), data * 2)
3074 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003075
Benjamin Petersona1b49012009-03-31 23:11:32 +00003076 def test_unreadable(self):
3077 class UnReadable(self.BytesIO):
3078 def readable(self):
3079 return False
3080 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003081 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00003082
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003083 def test_read_one_by_one(self):
3084 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003085 reads = ""
3086 while True:
3087 c = txt.read(1)
3088 if not c:
3089 break
3090 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003091 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003092
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00003093 def test_readlines(self):
3094 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
3095 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
3096 txt.seek(0)
3097 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
3098 txt.seek(0)
3099 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
3100
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003101 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003102 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003103 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003104 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003105 reads = ""
3106 while True:
3107 c = txt.read(128)
3108 if not c:
3109 break
3110 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003111 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003112
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02003113 def test_writelines(self):
3114 l = ['ab', 'cd', 'ef']
3115 buf = self.BytesIO()
3116 txt = self.TextIOWrapper(buf)
3117 txt.writelines(l)
3118 txt.flush()
3119 self.assertEqual(buf.getvalue(), b'abcdef')
3120
3121 def test_writelines_userlist(self):
3122 l = UserList(['ab', 'cd', 'ef'])
3123 buf = self.BytesIO()
3124 txt = self.TextIOWrapper(buf)
3125 txt.writelines(l)
3126 txt.flush()
3127 self.assertEqual(buf.getvalue(), b'abcdef')
3128
3129 def test_writelines_error(self):
3130 txt = self.TextIOWrapper(self.BytesIO())
3131 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3132 self.assertRaises(TypeError, txt.writelines, None)
3133 self.assertRaises(TypeError, txt.writelines, b'abc')
3134
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003135 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003136 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003137
3138 # read one char at a time
3139 reads = ""
3140 while True:
3141 c = txt.read(1)
3142 if not c:
3143 break
3144 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003145 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003146
3147 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003148 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003149 txt._CHUNK_SIZE = 4
3150
3151 reads = ""
3152 while True:
3153 c = txt.read(4)
3154 if not c:
3155 break
3156 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003157 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003158
3159 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003160 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003161 txt._CHUNK_SIZE = 4
3162
3163 reads = txt.read(4)
3164 reads += txt.read(4)
3165 reads += txt.readline()
3166 reads += txt.readline()
3167 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003168 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003169
3170 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003171 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003172 txt._CHUNK_SIZE = 4
3173
3174 reads = txt.read(4)
3175 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003176 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003177
3178 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003179 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003180 txt._CHUNK_SIZE = 4
3181
3182 reads = txt.read(4)
3183 pos = txt.tell()
3184 txt.seek(0)
3185 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003186 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003187
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003188 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003189 buffer = self.BytesIO(self.testdata)
3190 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003191
3192 self.assertEqual(buffer.seekable(), txt.seekable())
3193
Antoine Pitroue4501852009-05-14 18:55:55 +00003194 def test_append_bom(self):
3195 # The BOM is not written again when appending to a non-empty file
3196 filename = support.TESTFN
3197 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3198 with self.open(filename, 'w', encoding=charset) as f:
3199 f.write('aaa')
3200 pos = f.tell()
3201 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003202 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003203
3204 with self.open(filename, 'a', encoding=charset) as f:
3205 f.write('xxx')
3206 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003207 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003208
3209 def test_seek_bom(self):
3210 # Same test, but when seeking manually
3211 filename = support.TESTFN
3212 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3213 with self.open(filename, 'w', encoding=charset) as f:
3214 f.write('aaa')
3215 pos = f.tell()
3216 with self.open(filename, 'r+', encoding=charset) as f:
3217 f.seek(pos)
3218 f.write('zzz')
3219 f.seek(0)
3220 f.write('bbb')
3221 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003222 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003223
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003224 def test_seek_append_bom(self):
3225 # Same test, but first seek to the start and then to the end
3226 filename = support.TESTFN
3227 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3228 with self.open(filename, 'w', encoding=charset) as f:
3229 f.write('aaa')
3230 with self.open(filename, 'a', encoding=charset) as f:
3231 f.seek(0)
3232 f.seek(0, self.SEEK_END)
3233 f.write('xxx')
3234 with self.open(filename, 'rb') as f:
3235 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3236
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003237 def test_errors_property(self):
3238 with self.open(support.TESTFN, "w") as f:
3239 self.assertEqual(f.errors, "strict")
3240 with self.open(support.TESTFN, "w", errors="replace") as f:
3241 self.assertEqual(f.errors, "replace")
3242
Brett Cannon31f59292011-02-21 19:29:56 +00003243 @support.no_tracing
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003244 def test_threads_write(self):
3245 # Issue6750: concurrent writes could duplicate data
3246 event = threading.Event()
3247 with self.open(support.TESTFN, "w", buffering=1) as f:
3248 def run(n):
3249 text = "Thread%03d\n" % n
3250 event.wait()
3251 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003252 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003253 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003254 with support.start_threads(threads, event.set):
3255 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003256 with self.open(support.TESTFN) as f:
3257 content = f.read()
3258 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003259 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003260
Antoine Pitrou6be88762010-05-03 16:48:20 +00003261 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003262 # Test that text file is closed despite failed flush
3263 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003264 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003265 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003266 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003267 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003268 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003269 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003270 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003271 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003272 self.assertTrue(txt.buffer.closed)
3273 self.assertTrue(closed) # flush() called
3274 self.assertFalse(closed[0]) # flush() called before file closed
3275 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003276 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003277
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003278 def test_close_error_on_close(self):
3279 buffer = self.BytesIO(self.testdata)
3280 def bad_flush():
3281 raise OSError('flush')
3282 def bad_close():
3283 raise OSError('close')
3284 buffer.close = bad_close
3285 txt = self.TextIOWrapper(buffer, encoding="ascii")
3286 txt.flush = bad_flush
3287 with self.assertRaises(OSError) as err: # exception not swallowed
3288 txt.close()
3289 self.assertEqual(err.exception.args, ('close',))
3290 self.assertIsInstance(err.exception.__context__, OSError)
3291 self.assertEqual(err.exception.__context__.args, ('flush',))
3292 self.assertFalse(txt.closed)
3293
Victor Stinner472f7942019-04-12 21:58:24 +02003294 # Silence destructor error
3295 buffer.close = lambda: None
3296 txt.flush = lambda: None
3297
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003298 def test_nonnormalized_close_error_on_close(self):
3299 # Issue #21677
3300 buffer = self.BytesIO(self.testdata)
3301 def bad_flush():
3302 raise non_existing_flush
3303 def bad_close():
3304 raise non_existing_close
3305 buffer.close = bad_close
3306 txt = self.TextIOWrapper(buffer, encoding="ascii")
3307 txt.flush = bad_flush
3308 with self.assertRaises(NameError) as err: # exception not swallowed
3309 txt.close()
3310 self.assertIn('non_existing_close', str(err.exception))
3311 self.assertIsInstance(err.exception.__context__, NameError)
3312 self.assertIn('non_existing_flush', str(err.exception.__context__))
3313 self.assertFalse(txt.closed)
3314
Victor Stinner472f7942019-04-12 21:58:24 +02003315 # Silence destructor error
3316 buffer.close = lambda: None
3317 txt.flush = lambda: None
3318
Antoine Pitrou6be88762010-05-03 16:48:20 +00003319 def test_multi_close(self):
3320 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3321 txt.close()
3322 txt.close()
3323 txt.close()
3324 self.assertRaises(ValueError, txt.flush)
3325
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003326 def test_unseekable(self):
3327 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3328 self.assertRaises(self.UnsupportedOperation, txt.tell)
3329 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3330
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003331 def test_readonly_attributes(self):
3332 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3333 buf = self.BytesIO(self.testdata)
3334 with self.assertRaises(AttributeError):
3335 txt.buffer = buf
3336
Antoine Pitroue96ec682011-07-23 21:46:35 +02003337 def test_rawio(self):
3338 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3339 # that subprocess.Popen() can have the required unbuffered
3340 # semantics with universal_newlines=True.
3341 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3342 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3343 # Reads
3344 self.assertEqual(txt.read(4), 'abcd')
3345 self.assertEqual(txt.readline(), 'efghi\n')
3346 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3347
3348 def test_rawio_write_through(self):
3349 # Issue #12591: with write_through=True, writes don't need a flush
3350 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3351 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3352 write_through=True)
3353 txt.write('1')
3354 txt.write('23\n4')
3355 txt.write('5')
3356 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3357
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003358 def test_bufio_write_through(self):
3359 # Issue #21396: write_through=True doesn't force a flush()
3360 # on the underlying binary buffered object.
3361 flush_called, write_called = [], []
3362 class BufferedWriter(self.BufferedWriter):
3363 def flush(self, *args, **kwargs):
3364 flush_called.append(True)
3365 return super().flush(*args, **kwargs)
3366 def write(self, *args, **kwargs):
3367 write_called.append(True)
3368 return super().write(*args, **kwargs)
3369
3370 rawio = self.BytesIO()
3371 data = b"a"
3372 bufio = BufferedWriter(rawio, len(data)*2)
3373 textio = self.TextIOWrapper(bufio, encoding='ascii',
3374 write_through=True)
3375 # write to the buffered io but don't overflow the buffer
3376 text = data.decode('ascii')
3377 textio.write(text)
3378
3379 # buffer.flush is not called with write_through=True
3380 self.assertFalse(flush_called)
3381 # buffer.write *is* called with write_through=True
3382 self.assertTrue(write_called)
3383 self.assertEqual(rawio.getvalue(), b"") # no flush
3384
3385 write_called = [] # reset
3386 textio.write(text * 10) # total content is larger than bufio buffer
3387 self.assertTrue(write_called)
3388 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3389
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02003390 def test_reconfigure_write_through(self):
3391 raw = self.MockRawIO([])
3392 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3393 t.write('1')
3394 t.reconfigure(write_through=True) # implied flush
3395 self.assertEqual(t.write_through, True)
3396 self.assertEqual(b''.join(raw._write_stack), b'1')
3397 t.write('23')
3398 self.assertEqual(b''.join(raw._write_stack), b'123')
3399 t.reconfigure(write_through=False)
3400 self.assertEqual(t.write_through, False)
3401 t.write('45')
3402 t.flush()
3403 self.assertEqual(b''.join(raw._write_stack), b'12345')
3404 # Keeping default value
3405 t.reconfigure()
3406 t.reconfigure(write_through=None)
3407 self.assertEqual(t.write_through, False)
3408 t.reconfigure(write_through=True)
3409 t.reconfigure()
3410 t.reconfigure(write_through=None)
3411 self.assertEqual(t.write_through, True)
3412
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003413 def test_read_nonbytes(self):
3414 # Issue #17106
3415 # Crash when underlying read() returns non-bytes
3416 t = self.TextIOWrapper(self.StringIO('a'))
3417 self.assertRaises(TypeError, t.read, 1)
3418 t = self.TextIOWrapper(self.StringIO('a'))
3419 self.assertRaises(TypeError, t.readline)
3420 t = self.TextIOWrapper(self.StringIO('a'))
3421 self.assertRaises(TypeError, t.read)
3422
Oren Milmana5b4ea12017-08-25 21:14:54 +03003423 def test_illegal_encoder(self):
3424 # Issue 31271: Calling write() while the return value of encoder's
3425 # encode() is invalid shouldn't cause an assertion failure.
3426 rot13 = codecs.lookup("rot13")
3427 with support.swap_attr(rot13, '_is_text_encoding', True):
3428 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3429 self.assertRaises(TypeError, t.write, 'bar')
3430
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003431 def test_illegal_decoder(self):
3432 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003433 # Bypass the early encoding check added in issue 20404
3434 def _make_illegal_wrapper():
3435 quopri = codecs.lookup("quopri")
3436 quopri._is_text_encoding = True
3437 try:
3438 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3439 newline='\n', encoding="quopri")
3440 finally:
3441 quopri._is_text_encoding = False
3442 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003443 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003444 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003445 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003446 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003447 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003448 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003449 self.assertRaises(TypeError, t.read)
3450
Oren Milmanba7d7362017-08-29 11:58:27 +03003451 # Issue 31243: calling read() while the return value of decoder's
3452 # getstate() is invalid should neither crash the interpreter nor
3453 # raise a SystemError.
3454 def _make_very_illegal_wrapper(getstate_ret_val):
3455 class BadDecoder:
3456 def getstate(self):
3457 return getstate_ret_val
3458 def _get_bad_decoder(dummy):
3459 return BadDecoder()
3460 quopri = codecs.lookup("quopri")
3461 with support.swap_attr(quopri, 'incrementaldecoder',
3462 _get_bad_decoder):
3463 return _make_illegal_wrapper()
3464 t = _make_very_illegal_wrapper(42)
3465 self.assertRaises(TypeError, t.read, 42)
3466 t = _make_very_illegal_wrapper(())
3467 self.assertRaises(TypeError, t.read, 42)
3468 t = _make_very_illegal_wrapper((1, 2))
3469 self.assertRaises(TypeError, t.read, 42)
3470
Antoine Pitrou712cb732013-12-21 15:51:54 +01003471 def _check_create_at_shutdown(self, **kwargs):
3472 # Issue #20037: creating a TextIOWrapper at shutdown
3473 # shouldn't crash the interpreter.
3474 iomod = self.io.__name__
3475 code = """if 1:
3476 import codecs
3477 import {iomod} as io
3478
3479 # Avoid looking up codecs at shutdown
3480 codecs.lookup('utf-8')
3481
3482 class C:
3483 def __init__(self):
3484 self.buf = io.BytesIO()
3485 def __del__(self):
3486 io.TextIOWrapper(self.buf, **{kwargs})
3487 print("ok")
3488 c = C()
3489 """.format(iomod=iomod, kwargs=kwargs)
3490 return assert_python_ok("-c", code)
3491
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003492 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003493 def test_create_at_shutdown_without_encoding(self):
3494 rc, out, err = self._check_create_at_shutdown()
3495 if err:
3496 # Can error out with a RuntimeError if the module state
3497 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003498 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003499 else:
3500 self.assertEqual("ok", out.decode().strip())
3501
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003502 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003503 def test_create_at_shutdown_with_encoding(self):
3504 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3505 errors='strict')
3506 self.assertFalse(err)
3507 self.assertEqual("ok", out.decode().strip())
3508
Antoine Pitroub8503892014-04-29 10:14:02 +02003509 def test_read_byteslike(self):
3510 r = MemviewBytesIO(b'Just some random string\n')
3511 t = self.TextIOWrapper(r, 'utf-8')
3512
3513 # TextIOwrapper will not read the full string, because
3514 # we truncate it to a multiple of the native int size
3515 # so that we can construct a more complex memoryview.
3516 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3517
3518 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3519
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003520 def test_issue22849(self):
3521 class F(object):
3522 def readable(self): return True
3523 def writable(self): return True
3524 def seekable(self): return True
3525
3526 for i in range(10):
3527 try:
3528 self.TextIOWrapper(F(), encoding='utf-8')
3529 except Exception:
3530 pass
3531
3532 F.tell = lambda x: 0
3533 t = self.TextIOWrapper(F(), encoding='utf-8')
3534
INADA Naoki507434f2017-12-21 09:59:53 +09003535 def test_reconfigure_encoding_read(self):
3536 # latin1 -> utf8
3537 # (latin1 can decode utf-8 encoded string)
3538 data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
3539 raw = self.BytesIO(data)
3540 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3541 self.assertEqual(txt.readline(), 'abc\xe9\n')
3542 with self.assertRaises(self.UnsupportedOperation):
3543 txt.reconfigure(encoding='utf-8')
3544 with self.assertRaises(self.UnsupportedOperation):
3545 txt.reconfigure(newline=None)
3546
3547 def test_reconfigure_write_fromascii(self):
3548 # ascii has a specific encodefunc in the C implementation,
3549 # but utf-8-sig has not. Make sure that we get rid of the
3550 # cached encodefunc when we switch encoders.
3551 raw = self.BytesIO()
3552 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3553 txt.write('foo\n')
3554 txt.reconfigure(encoding='utf-8-sig')
3555 txt.write('\xe9\n')
3556 txt.flush()
3557 self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
3558
3559 def test_reconfigure_write(self):
3560 # latin -> utf8
3561 raw = self.BytesIO()
3562 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3563 txt.write('abc\xe9\n')
3564 txt.reconfigure(encoding='utf-8')
3565 self.assertEqual(raw.getvalue(), b'abc\xe9\n')
3566 txt.write('d\xe9f\n')
3567 txt.flush()
3568 self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
3569
3570 # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
3571 # the file
3572 raw = self.BytesIO()
3573 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3574 txt.write('abc\n')
3575 txt.reconfigure(encoding='utf-8-sig')
3576 txt.write('d\xe9f\n')
3577 txt.flush()
3578 self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
3579
3580 def test_reconfigure_write_non_seekable(self):
3581 raw = self.BytesIO()
3582 raw.seekable = lambda: False
3583 raw.seek = None
3584 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3585 txt.write('abc\n')
3586 txt.reconfigure(encoding='utf-8-sig')
3587 txt.write('d\xe9f\n')
3588 txt.flush()
3589
3590 # If the raw stream is not seekable, there'll be a BOM
3591 self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
3592
3593 def test_reconfigure_defaults(self):
3594 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
3595 txt.reconfigure(encoding=None)
3596 self.assertEqual(txt.encoding, 'ascii')
3597 self.assertEqual(txt.errors, 'replace')
3598 txt.write('LF\n')
3599
3600 txt.reconfigure(newline='\r\n')
3601 self.assertEqual(txt.encoding, 'ascii')
3602 self.assertEqual(txt.errors, 'replace')
3603
3604 txt.reconfigure(errors='ignore')
3605 self.assertEqual(txt.encoding, 'ascii')
3606 self.assertEqual(txt.errors, 'ignore')
3607 txt.write('CRLF\n')
3608
3609 txt.reconfigure(encoding='utf-8', newline=None)
3610 self.assertEqual(txt.errors, 'strict')
3611 txt.seek(0)
3612 self.assertEqual(txt.read(), 'LF\nCRLF\n')
3613
3614 self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
3615
3616 def test_reconfigure_newline(self):
3617 raw = self.BytesIO(b'CR\rEOF')
3618 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3619 txt.reconfigure(newline=None)
3620 self.assertEqual(txt.readline(), 'CR\n')
3621 raw = self.BytesIO(b'CR\rEOF')
3622 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3623 txt.reconfigure(newline='')
3624 self.assertEqual(txt.readline(), 'CR\r')
3625 raw = self.BytesIO(b'CR\rLF\nEOF')
3626 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3627 txt.reconfigure(newline='\n')
3628 self.assertEqual(txt.readline(), 'CR\rLF\n')
3629 raw = self.BytesIO(b'LF\nCR\rEOF')
3630 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3631 txt.reconfigure(newline='\r')
3632 self.assertEqual(txt.readline(), 'LF\nCR\r')
3633 raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
3634 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3635 txt.reconfigure(newline='\r\n')
3636 self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
3637
3638 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
3639 txt.reconfigure(newline=None)
3640 txt.write('linesep\n')
3641 txt.reconfigure(newline='')
3642 txt.write('LF\n')
3643 txt.reconfigure(newline='\n')
3644 txt.write('LF\n')
3645 txt.reconfigure(newline='\r')
3646 txt.write('CR\n')
3647 txt.reconfigure(newline='\r\n')
3648 txt.write('CRLF\n')
3649 expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
3650 self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
3651
Zackery Spytz23db9352018-06-29 04:14:58 -06003652 def test_issue25862(self):
3653 # Assertion failures occurred in tell() after read() and write().
3654 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3655 t.read(1)
3656 t.read()
3657 t.tell()
3658 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3659 t.read(1)
3660 t.write('x')
3661 t.tell()
3662
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003663
Antoine Pitroub8503892014-04-29 10:14:02 +02003664class MemviewBytesIO(io.BytesIO):
3665 '''A BytesIO object whose read method returns memoryviews
3666 rather than bytes'''
3667
3668 def read1(self, len_):
3669 return _to_memoryview(super().read1(len_))
3670
3671 def read(self, len_):
3672 return _to_memoryview(super().read(len_))
3673
3674def _to_memoryview(buf):
3675 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3676
3677 arr = array.array('i')
3678 idx = len(buf) - len(buf) % arr.itemsize
3679 arr.frombytes(buf[:idx])
3680 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003681
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003682
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003683class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003684 io = io
Steve Dowerfe8f4c92016-09-07 09:31:52 -07003685 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003686
3687 def test_initialization(self):
3688 r = self.BytesIO(b"\xc3\xa9\n\n")
3689 b = self.BufferedReader(r, 1000)
3690 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003691 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3692 self.assertRaises(ValueError, t.read)
3693
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003694 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3695 self.assertRaises(Exception, repr, t)
3696
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003697 def test_garbage_collection(self):
3698 # C TextIOWrapper objects are collected, and collecting them flushes
3699 # all data to disk.
3700 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003701 with support.check_warnings(('', ResourceWarning)):
3702 rawio = io.FileIO(support.TESTFN, "wb")
3703 b = self.BufferedWriter(rawio)
3704 t = self.TextIOWrapper(b, encoding="ascii")
3705 t.write("456def")
3706 t.x = t
3707 wr = weakref.ref(t)
3708 del t
3709 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003710 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003711 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003712 self.assertEqual(f.read(), b"456def")
3713
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003714 def test_rwpair_cleared_before_textio(self):
3715 # Issue 13070: TextIOWrapper's finalization would crash when called
3716 # after the reference to the underlying BufferedRWPair's writer got
3717 # cleared by the GC.
3718 for i in range(1000):
3719 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3720 t1 = self.TextIOWrapper(b1, encoding="ascii")
3721 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3722 t2 = self.TextIOWrapper(b2, encoding="ascii")
3723 # circular references
3724 t1.buddy = t2
3725 t2.buddy = t1
3726 support.gc_collect()
3727
Zackery Spytz842acaa2018-12-17 07:52:45 -07003728 def test_del__CHUNK_SIZE_SystemError(self):
3729 t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
3730 with self.assertRaises(AttributeError):
3731 del t._CHUNK_SIZE
3732
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003733
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003734class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003735 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003736 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003737
3738
3739class IncrementalNewlineDecoderTest(unittest.TestCase):
3740
3741 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003742 # UTF-8 specific tests for a newline decoder
3743 def _check_decode(b, s, **kwargs):
3744 # We exercise getstate() / setstate() as well as decode()
3745 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003746 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003747 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003748 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003749
Antoine Pitrou180a3362008-12-14 16:36:46 +00003750 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003751
Antoine Pitrou180a3362008-12-14 16:36:46 +00003752 _check_decode(b'\xe8', "")
3753 _check_decode(b'\xa2', "")
3754 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003755
Antoine Pitrou180a3362008-12-14 16:36:46 +00003756 _check_decode(b'\xe8', "")
3757 _check_decode(b'\xa2', "")
3758 _check_decode(b'\x88', "\u8888")
3759
3760 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003761 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3762
Antoine Pitrou180a3362008-12-14 16:36:46 +00003763 decoder.reset()
3764 _check_decode(b'\n', "\n")
3765 _check_decode(b'\r', "")
3766 _check_decode(b'', "\n", final=True)
3767 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003768
Antoine Pitrou180a3362008-12-14 16:36:46 +00003769 _check_decode(b'\r', "")
3770 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003771
Antoine Pitrou180a3362008-12-14 16:36:46 +00003772 _check_decode(b'\r\r\n', "\n\n")
3773 _check_decode(b'\r', "")
3774 _check_decode(b'\r', "\n")
3775 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003776
Antoine Pitrou180a3362008-12-14 16:36:46 +00003777 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3778 _check_decode(b'\xe8\xa2\x88', "\u8888")
3779 _check_decode(b'\n', "\n")
3780 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3781 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003782
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003783 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003784 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003785 if encoding is not None:
3786 encoder = codecs.getincrementalencoder(encoding)()
3787 def _decode_bytewise(s):
3788 # Decode one byte at a time
3789 for b in encoder.encode(s):
3790 result.append(decoder.decode(bytes([b])))
3791 else:
3792 encoder = None
3793 def _decode_bytewise(s):
3794 # Decode one char at a time
3795 for c in s:
3796 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003797 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003798 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003799 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003800 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003801 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003802 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003803 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003804 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003805 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003806 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003807 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003808 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003809 input = "abc"
3810 if encoder is not None:
3811 encoder.reset()
3812 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003813 self.assertEqual(decoder.decode(input), "abc")
3814 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003815
3816 def test_newline_decoder(self):
3817 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003818 # None meaning the IncrementalNewlineDecoder takes unicode input
3819 # rather than bytes input
3820 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003821 'utf-16', 'utf-16-le', 'utf-16-be',
3822 'utf-32', 'utf-32-le', 'utf-32-be',
3823 )
3824 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003825 decoder = enc and codecs.getincrementaldecoder(enc)()
3826 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3827 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003828 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003829 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3830 self.check_newline_decoding_utf8(decoder)
Oren Milman1d1d3e92017-08-20 18:35:36 +03003831 self.assertRaises(TypeError, decoder.setstate, 42)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003832
Antoine Pitrou66913e22009-03-06 23:40:56 +00003833 def test_newline_bytes(self):
3834 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3835 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003836 self.assertEqual(dec.newlines, None)
3837 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3838 self.assertEqual(dec.newlines, None)
3839 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3840 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003841 dec = self.IncrementalNewlineDecoder(None, translate=False)
3842 _check(dec)
3843 dec = self.IncrementalNewlineDecoder(None, translate=True)
3844 _check(dec)
3845
Xiang Zhangb08746b2018-10-31 19:49:16 +08003846 def test_translate(self):
3847 # issue 35062
3848 for translate in (-2, -1, 1, 2):
3849 decoder = codecs.getincrementaldecoder("utf-8")()
3850 decoder = self.IncrementalNewlineDecoder(decoder, translate)
3851 self.check_newline_decoding_utf8(decoder)
3852 decoder = codecs.getincrementaldecoder("utf-8")()
3853 decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
3854 self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
3855
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003856class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3857 pass
3858
3859class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3860 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003861
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003862
Guido van Rossum01a27522007-03-07 01:00:12 +00003863# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003864
Guido van Rossum5abbf752007-08-27 17:39:33 +00003865class MiscIOTest(unittest.TestCase):
3866
Barry Warsaw40e82462008-11-20 20:14:50 +00003867 def tearDown(self):
3868 support.unlink(support.TESTFN)
3869
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003870 def test___all__(self):
3871 for name in self.io.__all__:
3872 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003873 self.assertIsNotNone(obj, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003874 if name == "open":
3875 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003876 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003877 self.assertTrue(issubclass(obj, Exception), name)
3878 elif not name.startswith("SEEK_"):
3879 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003880
Barry Warsaw40e82462008-11-20 20:14:50 +00003881 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003882 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003883 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003884 f.close()
3885
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003886 with support.check_warnings(('', DeprecationWarning)):
3887 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003888 self.assertEqual(f.name, support.TESTFN)
3889 self.assertEqual(f.buffer.name, support.TESTFN)
3890 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3891 self.assertEqual(f.mode, "U")
3892 self.assertEqual(f.buffer.mode, "rb")
3893 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003894 f.close()
3895
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003896 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003897 self.assertEqual(f.mode, "w+")
3898 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3899 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003900
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003901 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003902 self.assertEqual(g.mode, "wb")
3903 self.assertEqual(g.raw.mode, "wb")
3904 self.assertEqual(g.name, f.fileno())
3905 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003906 f.close()
3907 g.close()
3908
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003909 def test_io_after_close(self):
3910 for kwargs in [
3911 {"mode": "w"},
3912 {"mode": "wb"},
3913 {"mode": "w", "buffering": 1},
3914 {"mode": "w", "buffering": 2},
3915 {"mode": "wb", "buffering": 0},
3916 {"mode": "r"},
3917 {"mode": "rb"},
3918 {"mode": "r", "buffering": 1},
3919 {"mode": "r", "buffering": 2},
3920 {"mode": "rb", "buffering": 0},
3921 {"mode": "w+"},
3922 {"mode": "w+b"},
3923 {"mode": "w+", "buffering": 1},
3924 {"mode": "w+", "buffering": 2},
3925 {"mode": "w+b", "buffering": 0},
3926 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003927 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003928 f.close()
3929 self.assertRaises(ValueError, f.flush)
3930 self.assertRaises(ValueError, f.fileno)
3931 self.assertRaises(ValueError, f.isatty)
3932 self.assertRaises(ValueError, f.__iter__)
3933 if hasattr(f, "peek"):
3934 self.assertRaises(ValueError, f.peek, 1)
3935 self.assertRaises(ValueError, f.read)
3936 if hasattr(f, "read1"):
3937 self.assertRaises(ValueError, f.read1, 1024)
Martin Panterccb2c0e2016-10-20 23:48:14 +00003938 self.assertRaises(ValueError, f.read1)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003939 if hasattr(f, "readall"):
3940 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003941 if hasattr(f, "readinto"):
3942 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003943 if hasattr(f, "readinto1"):
3944 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003945 self.assertRaises(ValueError, f.readline)
3946 self.assertRaises(ValueError, f.readlines)
Xiang Zhang026435c2017-04-15 12:47:28 +08003947 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003948 self.assertRaises(ValueError, f.seek, 0)
3949 self.assertRaises(ValueError, f.tell)
3950 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003951 self.assertRaises(ValueError, f.write,
3952 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003953 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003954 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003955
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003956 def test_blockingioerror(self):
3957 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003958 class C(str):
3959 pass
3960 c = C("")
3961 b = self.BlockingIOError(1, c)
3962 c.b = b
3963 b.c = c
3964 wr = weakref.ref(c)
3965 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003966 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003967 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003968
3969 def test_abcs(self):
3970 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003971 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3972 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3973 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3974 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003975
3976 def _check_abc_inheritance(self, abcmodule):
3977 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003978 self.assertIsInstance(f, abcmodule.IOBase)
3979 self.assertIsInstance(f, abcmodule.RawIOBase)
3980 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3981 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003982 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003983 self.assertIsInstance(f, abcmodule.IOBase)
3984 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3985 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3986 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003987 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003988 self.assertIsInstance(f, abcmodule.IOBase)
3989 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3990 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3991 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003992
3993 def test_abc_inheritance(self):
3994 # Test implementations inherit from their respective ABCs
3995 self._check_abc_inheritance(self)
3996
3997 def test_abc_inheritance_official(self):
3998 # Test implementations inherit from the official ABCs of the
3999 # baseline "io" module.
4000 self._check_abc_inheritance(io)
4001
Antoine Pitroue033e062010-10-29 10:38:18 +00004002 def _check_warn_on_dealloc(self, *args, **kwargs):
4003 f = open(*args, **kwargs)
4004 r = repr(f)
4005 with self.assertWarns(ResourceWarning) as cm:
4006 f = None
4007 support.gc_collect()
4008 self.assertIn(r, str(cm.warning.args[0]))
4009
4010 def test_warn_on_dealloc(self):
4011 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
4012 self._check_warn_on_dealloc(support.TESTFN, "wb")
4013 self._check_warn_on_dealloc(support.TESTFN, "w")
4014
4015 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
4016 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00004017 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00004018 for fd in fds:
4019 try:
4020 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02004021 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00004022 if e.errno != errno.EBADF:
4023 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00004024 self.addCleanup(cleanup_fds)
4025 r, w = os.pipe()
4026 fds += r, w
4027 self._check_warn_on_dealloc(r, *args, **kwargs)
4028 # When using closefd=False, there's no warning
4029 r, w = os.pipe()
4030 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02004031 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00004032 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00004033
4034 def test_warn_on_dealloc_fd(self):
4035 self._check_warn_on_dealloc_fd("rb", buffering=0)
4036 self._check_warn_on_dealloc_fd("rb")
4037 self._check_warn_on_dealloc_fd("r")
4038
4039
Antoine Pitrou243757e2010-11-05 21:15:39 +00004040 def test_pickling(self):
4041 # Pickling file objects is forbidden
4042 for kwargs in [
4043 {"mode": "w"},
4044 {"mode": "wb"},
4045 {"mode": "wb", "buffering": 0},
4046 {"mode": "r"},
4047 {"mode": "rb"},
4048 {"mode": "rb", "buffering": 0},
4049 {"mode": "w+"},
4050 {"mode": "w+b"},
4051 {"mode": "w+b", "buffering": 0},
4052 ]:
4053 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
4054 with self.open(support.TESTFN, **kwargs) as f:
4055 self.assertRaises(TypeError, pickle.dumps, f, protocol)
4056
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004057 def test_nonblock_pipe_write_bigbuf(self):
4058 self._test_nonblock_pipe_write(16*1024)
4059
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004060 def test_nonblock_pipe_write_smallbuf(self):
4061 self._test_nonblock_pipe_write(1024)
4062
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004063 @unittest.skipUnless(hasattr(os, 'set_blocking'),
4064 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004065 def _test_nonblock_pipe_write(self, bufsize):
4066 sent = []
4067 received = []
4068 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004069 os.set_blocking(r, False)
4070 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004071
4072 # To exercise all code paths in the C implementation we need
4073 # to play with buffer sizes. For instance, if we choose a
4074 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
4075 # then we will never get a partial write of the buffer.
4076 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
4077 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
4078
4079 with rf, wf:
4080 for N in 9999, 73, 7574:
4081 try:
4082 i = 0
4083 while True:
4084 msg = bytes([i % 26 + 97]) * N
4085 sent.append(msg)
4086 wf.write(msg)
4087 i += 1
4088
4089 except self.BlockingIOError as e:
4090 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004091 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004092 sent[-1] = sent[-1][:e.characters_written]
4093 received.append(rf.read())
4094 msg = b'BLOCKED'
4095 wf.write(msg)
4096 sent.append(msg)
4097
4098 while True:
4099 try:
4100 wf.flush()
4101 break
4102 except self.BlockingIOError as e:
4103 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004104 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004105 self.assertEqual(e.characters_written, 0)
4106 received.append(rf.read())
4107
4108 received += iter(rf.read, None)
4109
4110 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03004111 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004112 self.assertTrue(wf.closed)
4113 self.assertTrue(rf.closed)
4114
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004115 def test_create_fail(self):
4116 # 'x' mode fails if file is existing
4117 with self.open(support.TESTFN, 'w'):
4118 pass
4119 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
4120
4121 def test_create_writes(self):
4122 # 'x' mode opens for writing
4123 with self.open(support.TESTFN, 'xb') as f:
4124 f.write(b"spam")
4125 with self.open(support.TESTFN, 'rb') as f:
4126 self.assertEqual(b"spam", f.read())
4127
Christian Heimes7b648752012-09-10 14:48:43 +02004128 def test_open_allargs(self):
4129 # there used to be a buffer overflow in the parser for rawmode
4130 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
4131
4132
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004133class CMiscIOTest(MiscIOTest):
4134 io = io
4135
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03004136 def test_readinto_buffer_overflow(self):
4137 # Issue #18025
4138 class BadReader(self.io.BufferedIOBase):
4139 def read(self, n=-1):
4140 return b'x' * 10**6
4141 bufio = BadReader()
4142 b = bytearray(2)
4143 self.assertRaises(ValueError, bufio.readinto, b)
4144
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004145 def check_daemon_threads_shutdown_deadlock(self, stream_name):
4146 # Issue #23309: deadlocks at shutdown should be avoided when a
4147 # daemon thread and the main thread both write to a file.
4148 code = """if 1:
4149 import sys
4150 import time
4151 import threading
Victor Stinner2a1aed02017-04-21 17:59:23 +02004152 from test.support import SuppressCrashReport
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004153
4154 file = sys.{stream_name}
4155
4156 def run():
4157 while True:
4158 file.write('.')
4159 file.flush()
4160
Victor Stinner2a1aed02017-04-21 17:59:23 +02004161 crash = SuppressCrashReport()
4162 crash.__enter__()
4163 # don't call __exit__(): the crash occurs at Python shutdown
4164
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004165 thread = threading.Thread(target=run)
4166 thread.daemon = True
4167 thread.start()
4168
4169 time.sleep(0.5)
4170 file.write('!')
4171 file.flush()
4172 """.format_map(locals())
4173 res, _ = run_python_until_end("-c", code)
4174 err = res.err.decode()
4175 if res.rc != 0:
4176 # Failure: should be a fatal error
4177 self.assertIn("Fatal Python error: could not acquire lock "
4178 "for <_io.BufferedWriter name='<{stream_name}>'> "
4179 "at interpreter shutdown, possibly due to "
4180 "daemon threads".format_map(locals()),
4181 err)
4182 else:
4183 self.assertFalse(err.strip('.!'))
4184
4185 def test_daemon_threads_shutdown_stdout_deadlock(self):
4186 self.check_daemon_threads_shutdown_deadlock('stdout')
4187
4188 def test_daemon_threads_shutdown_stderr_deadlock(self):
4189 self.check_daemon_threads_shutdown_deadlock('stderr')
4190
4191
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004192class PyMiscIOTest(MiscIOTest):
4193 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00004194
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004195
4196@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
4197class SignalsTest(unittest.TestCase):
4198
4199 def setUp(self):
4200 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
4201
4202 def tearDown(self):
4203 signal.signal(signal.SIGALRM, self.oldalrm)
4204
4205 def alarm_interrupt(self, sig, frame):
4206 1/0
4207
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004208 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
4209 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00004210 invokes the signal handler, and bubbles up the exception raised
4211 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004212 read_results = []
4213 def _read():
4214 s = os.read(r, 1)
4215 read_results.append(s)
Victor Stinner05c9d312018-12-18 23:52:39 +01004216
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004217 t = threading.Thread(target=_read)
4218 t.daemon = True
4219 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00004220 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01004221 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004222 try:
4223 wio = self.io.open(w, **fdopen_kwargs)
Victor Stinner05c9d312018-12-18 23:52:39 +01004224 if hasattr(signal, 'pthread_sigmask'):
4225 # create the thread with SIGALRM signal blocked
4226 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
4227 t.start()
4228 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM])
4229 else:
4230 t.start()
4231
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004232 # Fill the pipe enough that the write will be blocking.
4233 # It will be interrupted by the timer armed above. Since the
4234 # other thread has read one byte, the low-level write will
4235 # return with a successful (partial) result rather than an EINTR.
4236 # The buffered IO layer must check for pending signal
4237 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02004238 signal.alarm(1)
4239 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01004240 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02004241 finally:
4242 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03004243 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004244 # We got one byte, get another one and check that it isn't a
4245 # repeat of the first one.
4246 read_results.append(os.read(r, 1))
4247 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
4248 finally:
4249 os.close(w)
4250 os.close(r)
4251 # This is deliberate. If we didn't close the file descriptor
4252 # before closing wio, wio would try to flush its internal
4253 # buffer, and block again.
4254 try:
4255 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004256 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004257 if e.errno != errno.EBADF:
4258 raise
4259
4260 def test_interrupted_write_unbuffered(self):
4261 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
4262
4263 def test_interrupted_write_buffered(self):
4264 self.check_interrupted_write(b"xy", b"xy", mode="wb")
4265
4266 def test_interrupted_write_text(self):
4267 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
4268
Brett Cannon31f59292011-02-21 19:29:56 +00004269 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004270 def check_reentrant_write(self, data, **fdopen_kwargs):
4271 def on_alarm(*args):
4272 # Will be called reentrantly from the same thread
4273 wio.write(data)
4274 1/0
4275 signal.signal(signal.SIGALRM, on_alarm)
4276 r, w = os.pipe()
4277 wio = self.io.open(w, **fdopen_kwargs)
4278 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004279 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004280 # Either the reentrant call to wio.write() fails with RuntimeError,
4281 # or the signal handler raises ZeroDivisionError.
4282 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4283 while 1:
4284 for i in range(100):
4285 wio.write(data)
4286 wio.flush()
4287 # Make sure the buffer doesn't fill up and block further writes
4288 os.read(r, len(data) * 100)
4289 exc = cm.exception
4290 if isinstance(exc, RuntimeError):
4291 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4292 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004293 signal.alarm(0)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004294 wio.close()
4295 os.close(r)
4296
4297 def test_reentrant_write_buffered(self):
4298 self.check_reentrant_write(b"xy", mode="wb")
4299
4300 def test_reentrant_write_text(self):
4301 self.check_reentrant_write("xy", mode="w", encoding="ascii")
4302
Antoine Pitrou707ce822011-02-25 21:24:11 +00004303 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4304 """Check that a buffered read, when it gets interrupted (either
4305 returning a partial result or EINTR), properly invokes the signal
4306 handler and retries if the latter returned successfully."""
4307 r, w = os.pipe()
4308 fdopen_kwargs["closefd"] = False
4309 def alarm_handler(sig, frame):
4310 os.write(w, b"bar")
4311 signal.signal(signal.SIGALRM, alarm_handler)
4312 try:
4313 rio = self.io.open(r, **fdopen_kwargs)
4314 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004315 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004316 # Expected behaviour:
4317 # - first raw read() returns partial b"foo"
4318 # - second raw read() returns EINTR
4319 # - third raw read() returns b"bar"
4320 self.assertEqual(decode(rio.read(6)), "foobar")
4321 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004322 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004323 rio.close()
4324 os.close(w)
4325 os.close(r)
4326
Antoine Pitrou20db5112011-08-19 20:32:34 +02004327 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004328 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4329 mode="rb")
4330
Antoine Pitrou20db5112011-08-19 20:32:34 +02004331 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004332 self.check_interrupted_read_retry(lambda x: x,
4333 mode="r")
4334
Antoine Pitrou707ce822011-02-25 21:24:11 +00004335 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4336 """Check that a buffered write, when it gets interrupted (either
4337 returning a partial result or EINTR), properly invokes the signal
4338 handler and retries if the latter returned successfully."""
4339 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004340
Antoine Pitrou707ce822011-02-25 21:24:11 +00004341 # A quantity that exceeds the buffer size of an anonymous pipe's
4342 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02004343 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00004344 r, w = os.pipe()
4345 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004346
Antoine Pitrou707ce822011-02-25 21:24:11 +00004347 # We need a separate thread to read from the pipe and allow the
4348 # write() to finish. This thread is started after the SIGALRM is
4349 # received (forcing a first EINTR in write()).
4350 read_results = []
4351 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004352 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00004353 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004354 try:
4355 while not write_finished:
4356 while r in select.select([r], [], [], 1.0)[0]:
4357 s = os.read(r, 1024)
4358 read_results.append(s)
4359 except BaseException as exc:
4360 nonlocal error
4361 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00004362 t = threading.Thread(target=_read)
4363 t.daemon = True
4364 def alarm1(sig, frame):
4365 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004366 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004367 def alarm2(sig, frame):
4368 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004369
4370 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00004371 signal.signal(signal.SIGALRM, alarm1)
4372 try:
4373 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004374 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004375 # Expected behaviour:
4376 # - first raw write() is partial (because of the limited pipe buffer
4377 # and the first alarm)
4378 # - second raw write() returns EINTR (because of the second alarm)
4379 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004380 written = wio.write(large_data)
4381 self.assertEqual(N, written)
4382
Antoine Pitrou707ce822011-02-25 21:24:11 +00004383 wio.flush()
4384 write_finished = True
4385 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004386
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004387 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004388 self.assertEqual(N, sum(len(x) for x in read_results))
4389 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004390 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004391 write_finished = True
4392 os.close(w)
4393 os.close(r)
4394 # This is deliberate. If we didn't close the file descriptor
4395 # before closing wio, wio would try to flush its internal
4396 # buffer, and could block (in case of failure).
4397 try:
4398 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004399 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00004400 if e.errno != errno.EBADF:
4401 raise
4402
Antoine Pitrou20db5112011-08-19 20:32:34 +02004403 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004404 self.check_interrupted_write_retry(b"x", mode="wb")
4405
Antoine Pitrou20db5112011-08-19 20:32:34 +02004406 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004407 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4408
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004409
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004410class CSignalsTest(SignalsTest):
4411 io = io
4412
4413class PySignalsTest(SignalsTest):
4414 io = pyio
4415
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004416 # Handling reentrancy issues would slow down _pyio even more, so the
4417 # tests are disabled.
4418 test_reentrant_write_buffered = None
4419 test_reentrant_write_text = None
4420
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004421
Ezio Melottidaa42c72013-03-23 16:30:16 +02004422def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07004423 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004424 CBufferedReaderTest, PyBufferedReaderTest,
4425 CBufferedWriterTest, PyBufferedWriterTest,
4426 CBufferedRWPairTest, PyBufferedRWPairTest,
4427 CBufferedRandomTest, PyBufferedRandomTest,
4428 StatefulIncrementalDecoderTest,
4429 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4430 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004431 CMiscIOTest, PyMiscIOTest,
4432 CSignalsTest, PySignalsTest,
4433 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004434
4435 # Put the namespaces of the IO module we are testing and some useful mock
4436 # classes in the __dict__ of each test.
4437 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
benfogle9703f092017-11-10 16:03:40 -05004438 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4439 SlowFlushRawIO)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004440 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4441 c_io_ns = {name : getattr(io, name) for name in all_members}
4442 py_io_ns = {name : getattr(pyio, name) for name in all_members}
4443 globs = globals()
4444 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4445 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4446 # Avoid turning open into a bound method.
4447 py_io_ns["open"] = pyio.OpenWrapper
4448 for test in tests:
4449 if test.__name__.startswith("C"):
4450 for name, obj in c_io_ns.items():
4451 setattr(test, name, obj)
4452 elif test.__name__.startswith("Py"):
4453 for name, obj in py_io_ns.items():
4454 setattr(test, name, obj)
4455
Ezio Melottidaa42c72013-03-23 16:30:16 +02004456 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4457 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004458
4459if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004460 unittest.main()