blob: d245c5d846ad760e9af399e02be24ddd5e6a7620 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Gregory P. Smithe5796c42018-12-30 20:17:57 -080031import sysconfig
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020032import threading
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000033import time
Guido van Rossum28524c72007-02-27 05:47:44 +000034import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000035import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020037from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020038from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000039from test import support
Berker Peksagce643912015-05-06 06:33:17 +030040from test.support.script_helper import assert_python_ok, run_python_until_end
Serhiy Storchakab21d1552018-03-02 11:53:51 +020041from test.support import FakePath
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000042
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000043import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000044import io # C implementation of io
45import _pyio as pyio # Python implementation of io
Guido van Rossuma9e20242007-03-08 00:43:48 +000046
Martin Panter6bb91f32016-05-28 00:41:57 +000047try:
48 import ctypes
49except ImportError:
50 def byteslike(*pos, **kw):
51 return array.array("b", bytes(*pos, **kw))
52else:
53 def byteslike(*pos, **kw):
54 """Create a bytes-like object having no string or sequence methods"""
55 data = bytes(*pos, **kw)
56 obj = EmptyStruct()
57 ctypes.resize(obj, len(data))
58 memoryview(obj).cast("B")[:] = data
59 return obj
60 class EmptyStruct(ctypes.Structure):
61 pass
62
Gregory P. Smithe5796c42018-12-30 20:17:57 -080063_cflags = sysconfig.get_config_var('CFLAGS') or ''
64_config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
65MEMORY_SANITIZER = (
66 '-fsanitize=memory' in _cflags or
67 '--with-memory-sanitizer' in _config_args
68)
69
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000070def _default_chunk_size():
71 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000072 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000073 return f._CHUNK_SIZE
74
75
Antoine Pitrou328ec742010-09-14 18:37:24 +000076class MockRawIOWithoutRead:
77 """A RawIO implementation without read(), so as to exercise the default
78 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000079
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000080 def __init__(self, read_stack=()):
81 self._read_stack = list(read_stack)
82 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000083 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000084 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000085
Guido van Rossum01a27522007-03-07 01:00:12 +000086 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000087 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000088 return len(b)
89
90 def writable(self):
91 return True
92
Guido van Rossum68bbcd22007-02-27 17:19:33 +000093 def fileno(self):
94 return 42
95
96 def readable(self):
97 return True
98
Guido van Rossum01a27522007-03-07 01:00:12 +000099 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000100 return True
101
Guido van Rossum01a27522007-03-07 01:00:12 +0000102 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000103 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +0000104
105 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000106 return 0 # same comment as above
107
108 def readinto(self, buf):
109 self._reads += 1
110 max_len = len(buf)
111 try:
112 data = self._read_stack[0]
113 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000114 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000115 return 0
116 if data is None:
117 del self._read_stack[0]
118 return None
119 n = len(data)
120 if len(data) <= max_len:
121 del self._read_stack[0]
122 buf[:n] = data
123 return n
124 else:
125 buf[:] = data[:max_len]
126 self._read_stack[0] = data[max_len:]
127 return max_len
128
129 def truncate(self, pos=None):
130 return pos
131
Antoine Pitrou328ec742010-09-14 18:37:24 +0000132class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
133 pass
134
135class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
136 pass
137
138
139class MockRawIO(MockRawIOWithoutRead):
140
141 def read(self, n=None):
142 self._reads += 1
143 try:
144 return self._read_stack.pop(0)
145 except:
146 self._extraneous_reads += 1
147 return b""
148
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000149class CMockRawIO(MockRawIO, io.RawIOBase):
150 pass
151
152class PyMockRawIO(MockRawIO, pyio.RawIOBase):
153 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000154
Guido van Rossuma9e20242007-03-08 00:43:48 +0000155
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000156class MisbehavedRawIO(MockRawIO):
157 def write(self, b):
158 return super().write(b) * 2
159
160 def read(self, n=None):
161 return super().read(n) * 2
162
163 def seek(self, pos, whence):
164 return -123
165
166 def tell(self):
167 return -456
168
169 def readinto(self, buf):
170 super().readinto(buf)
171 return len(buf) * 5
172
173class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
174 pass
175
176class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
177 pass
178
179
benfogle9703f092017-11-10 16:03:40 -0500180class SlowFlushRawIO(MockRawIO):
181 def __init__(self):
182 super().__init__()
183 self.in_flush = threading.Event()
184
185 def flush(self):
186 self.in_flush.set()
187 time.sleep(0.25)
188
189class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
190 pass
191
192class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
193 pass
194
195
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196class CloseFailureIO(MockRawIO):
197 closed = 0
198
199 def close(self):
200 if not self.closed:
201 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200202 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000203
204class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
205 pass
206
207class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
208 pass
209
210
211class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000212
213 def __init__(self, data):
214 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000215 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000216
217 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000218 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000219 self.read_history.append(None if res is None else len(res))
220 return res
221
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000222 def readinto(self, b):
223 res = super().readinto(b)
224 self.read_history.append(res)
225 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000226
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000227class CMockFileIO(MockFileIO, io.BytesIO):
228 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000229
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000230class PyMockFileIO(MockFileIO, pyio.BytesIO):
231 pass
232
233
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000234class MockUnseekableIO:
235 def seekable(self):
236 return False
237
238 def seek(self, *args):
239 raise self.UnsupportedOperation("not seekable")
240
241 def tell(self, *args):
242 raise self.UnsupportedOperation("not seekable")
243
Martin Panter754aab22016-03-31 07:21:56 +0000244 def truncate(self, *args):
245 raise self.UnsupportedOperation("not seekable")
246
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000247class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
248 UnsupportedOperation = io.UnsupportedOperation
249
250class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
251 UnsupportedOperation = pyio.UnsupportedOperation
252
253
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000254class MockNonBlockWriterIO:
255
256 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000257 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000258 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000259
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000260 def pop_written(self):
261 s = b"".join(self._write_stack)
262 self._write_stack[:] = []
263 return s
264
265 def block_on(self, char):
266 """Block when a given char is encountered."""
267 self._blocker_char = char
268
269 def readable(self):
270 return True
271
272 def seekable(self):
273 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000274
Guido van Rossum01a27522007-03-07 01:00:12 +0000275 def writable(self):
276 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000277
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000278 def write(self, b):
279 b = bytes(b)
280 n = -1
281 if self._blocker_char:
282 try:
283 n = b.index(self._blocker_char)
284 except ValueError:
285 pass
286 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100287 if n > 0:
288 # write data up to the first blocker
289 self._write_stack.append(b[:n])
290 return n
291 else:
292 # cancel blocker and indicate would block
293 self._blocker_char = None
294 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000295 self._write_stack.append(b)
296 return len(b)
297
298class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
299 BlockingIOError = io.BlockingIOError
300
301class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
302 BlockingIOError = pyio.BlockingIOError
303
Guido van Rossuma9e20242007-03-08 00:43:48 +0000304
Guido van Rossum28524c72007-02-27 05:47:44 +0000305class IOTest(unittest.TestCase):
306
Neal Norwitze7789b12008-03-24 06:18:09 +0000307 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000308 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000309
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000310 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000311 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000312
Guido van Rossum28524c72007-02-27 05:47:44 +0000313 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000314 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000315 f.truncate(0)
316 self.assertEqual(f.tell(), 5)
317 f.seek(0)
318
319 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000320 self.assertEqual(f.seek(0), 0)
321 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000322 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000323 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000324 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000325 buffer = bytearray(b" world\n\n\n")
326 self.assertEqual(f.write(buffer), 9)
327 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000328 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000329 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000330 self.assertEqual(f.seek(-1, 2), 13)
331 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000332
Guido van Rossum87429772007-04-10 21:06:59 +0000333 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000334 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000335 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000336
Guido van Rossum9b76da62007-04-11 01:09:03 +0000337 def read_ops(self, f, buffered=False):
338 data = f.read(5)
339 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000340 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000341 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000342 self.assertEqual(bytes(data), b" worl")
343 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000344 self.assertEqual(f.readinto(data), 2)
345 self.assertEqual(len(data), 5)
346 self.assertEqual(data[:2], b"d\n")
347 self.assertEqual(f.seek(0), 0)
348 self.assertEqual(f.read(20), b"hello world\n")
349 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000350 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000351 self.assertEqual(f.seek(-6, 2), 6)
352 self.assertEqual(f.read(5), b"world")
353 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000354 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000355 self.assertEqual(f.seek(-6, 1), 5)
356 self.assertEqual(f.read(5), b" worl")
357 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000358 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000359 if buffered:
360 f.seek(0)
361 self.assertEqual(f.read(), b"hello world\n")
362 f.seek(6)
363 self.assertEqual(f.read(), b"world\n")
364 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000365 f.seek(0)
366 data = byteslike(5)
367 self.assertEqual(f.readinto1(data), 5)
368 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000369
Guido van Rossum34d69e52007-04-10 20:08:41 +0000370 LARGE = 2**31
371
Guido van Rossum53807da2007-04-10 19:01:47 +0000372 def large_file_ops(self, f):
373 assert f.readable()
374 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100375 try:
376 self.assertEqual(f.seek(self.LARGE), self.LARGE)
377 except (OverflowError, ValueError):
378 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000379 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000380 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000381 self.assertEqual(f.tell(), self.LARGE + 3)
382 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000383 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000384 self.assertEqual(f.tell(), self.LARGE + 2)
385 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000386 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000387 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000388 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
389 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000390 self.assertEqual(f.read(2), b"x")
391
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000392 def test_invalid_operations(self):
393 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000394 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000395 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000396 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000397 self.assertRaises(exc, fp.read)
398 self.assertRaises(exc, fp.readline)
399 with self.open(support.TESTFN, "wb", buffering=0) as fp:
400 self.assertRaises(exc, fp.read)
401 self.assertRaises(exc, fp.readline)
402 with self.open(support.TESTFN, "rb", buffering=0) as fp:
403 self.assertRaises(exc, fp.write, b"blah")
404 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000405 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000406 self.assertRaises(exc, fp.write, b"blah")
407 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000408 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000409 self.assertRaises(exc, fp.write, "blah")
410 self.assertRaises(exc, fp.writelines, ["blah\n"])
411 # Non-zero seeking from current or end pos
412 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
413 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000414
Martin Panter754aab22016-03-31 07:21:56 +0000415 def test_optional_abilities(self):
416 # Test for OSError when optional APIs are not supported
417 # The purpose of this test is to try fileno(), reading, writing and
418 # seeking operations with various objects that indicate they do not
419 # support these operations.
420
421 def pipe_reader():
422 [r, w] = os.pipe()
423 os.close(w) # So that read() is harmless
424 return self.FileIO(r, "r")
425
426 def pipe_writer():
427 [r, w] = os.pipe()
428 self.addCleanup(os.close, r)
429 # Guarantee that we can write into the pipe without blocking
430 thread = threading.Thread(target=os.read, args=(r, 100))
431 thread.start()
432 self.addCleanup(thread.join)
433 return self.FileIO(w, "w")
434
435 def buffered_reader():
436 return self.BufferedReader(self.MockUnseekableIO())
437
438 def buffered_writer():
439 return self.BufferedWriter(self.MockUnseekableIO())
440
441 def buffered_random():
442 return self.BufferedRandom(self.BytesIO())
443
444 def buffered_rw_pair():
445 return self.BufferedRWPair(self.MockUnseekableIO(),
446 self.MockUnseekableIO())
447
448 def text_reader():
449 class UnseekableReader(self.MockUnseekableIO):
450 writable = self.BufferedIOBase.writable
451 write = self.BufferedIOBase.write
452 return self.TextIOWrapper(UnseekableReader(), "ascii")
453
454 def text_writer():
455 class UnseekableWriter(self.MockUnseekableIO):
456 readable = self.BufferedIOBase.readable
457 read = self.BufferedIOBase.read
458 return self.TextIOWrapper(UnseekableWriter(), "ascii")
459
460 tests = (
461 (pipe_reader, "fr"), (pipe_writer, "fw"),
462 (buffered_reader, "r"), (buffered_writer, "w"),
463 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
464 (text_reader, "r"), (text_writer, "w"),
465 (self.BytesIO, "rws"), (self.StringIO, "rws"),
466 )
467 for [test, abilities] in tests:
Martin Panter754aab22016-03-31 07:21:56 +0000468 with self.subTest(test), test() as obj:
469 readable = "r" in abilities
470 self.assertEqual(obj.readable(), readable)
471 writable = "w" in abilities
472 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000473
474 if isinstance(obj, self.TextIOBase):
475 data = "3"
476 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
477 data = b"3"
478 else:
479 self.fail("Unknown base class")
480
481 if "f" in abilities:
482 obj.fileno()
483 else:
484 self.assertRaises(OSError, obj.fileno)
485
486 if readable:
487 obj.read(1)
488 obj.read()
489 else:
490 self.assertRaises(OSError, obj.read, 1)
491 self.assertRaises(OSError, obj.read)
492
493 if writable:
494 obj.write(data)
495 else:
496 self.assertRaises(OSError, obj.write, data)
497
Martin Panter3ee147f2016-03-31 21:05:31 +0000498 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000499 pipe_reader, pipe_writer):
500 # Pipes seem to appear as seekable on Windows
501 continue
502 seekable = "s" in abilities
503 self.assertEqual(obj.seekable(), seekable)
504
Martin Panter754aab22016-03-31 07:21:56 +0000505 if seekable:
506 obj.tell()
507 obj.seek(0)
508 else:
509 self.assertRaises(OSError, obj.tell)
510 self.assertRaises(OSError, obj.seek, 0)
511
512 if writable and seekable:
513 obj.truncate()
514 obj.truncate(0)
515 else:
516 self.assertRaises(OSError, obj.truncate)
517 self.assertRaises(OSError, obj.truncate, 0)
518
Antoine Pitrou13348842012-01-29 18:36:34 +0100519 def test_open_handles_NUL_chars(self):
520 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300521 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100522
523 bytes_fn = bytes(fn_with_NUL, 'ascii')
524 with warnings.catch_warnings():
525 warnings.simplefilter("ignore", DeprecationWarning)
526 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100527
Guido van Rossum28524c72007-02-27 05:47:44 +0000528 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000529 with self.open(support.TESTFN, "wb", buffering=0) as f:
530 self.assertEqual(f.readable(), False)
531 self.assertEqual(f.writable(), True)
532 self.assertEqual(f.seekable(), True)
533 self.write_ops(f)
534 with self.open(support.TESTFN, "rb", buffering=0) as f:
535 self.assertEqual(f.readable(), True)
536 self.assertEqual(f.writable(), False)
537 self.assertEqual(f.seekable(), True)
538 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000539
Guido van Rossum87429772007-04-10 21:06:59 +0000540 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000541 with self.open(support.TESTFN, "wb") as f:
542 self.assertEqual(f.readable(), False)
543 self.assertEqual(f.writable(), True)
544 self.assertEqual(f.seekable(), True)
545 self.write_ops(f)
546 with self.open(support.TESTFN, "rb") as f:
547 self.assertEqual(f.readable(), True)
548 self.assertEqual(f.writable(), False)
549 self.assertEqual(f.seekable(), True)
550 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000551
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000552 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000553 with self.open(support.TESTFN, "wb") as f:
554 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
555 with self.open(support.TESTFN, "rb") as f:
556 self.assertEqual(f.readline(), b"abc\n")
557 self.assertEqual(f.readline(10), b"def\n")
558 self.assertEqual(f.readline(2), b"xy")
559 self.assertEqual(f.readline(4), b"zzy\n")
560 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000561 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000562 self.assertRaises(TypeError, f.readline, 5.3)
563 with self.open(support.TESTFN, "r") as f:
564 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000565
Serhiy Storchakabf623ae2017-04-19 20:03:52 +0300566 def test_readline_nonsizeable(self):
567 # Issue #30061
568 # Crash when readline() returns an object without __len__
569 class R(self.IOBase):
570 def readline(self):
571 return None
572 self.assertRaises((TypeError, StopIteration), next, R())
573
574 def test_next_nonsizeable(self):
575 # Issue #30061
576 # Crash when __next__() returns an object without __len__
577 class R(self.IOBase):
578 def __next__(self):
579 return None
580 self.assertRaises(TypeError, R().readlines, 1)
581
Guido van Rossum28524c72007-02-27 05:47:44 +0000582 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000583 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000584 self.write_ops(f)
585 data = f.getvalue()
586 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000587 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000588 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000589
Guido van Rossum53807da2007-04-10 19:01:47 +0000590 def test_large_file_ops(self):
Mike53f7a7c2017-12-14 14:04:53 +0300591 # On Windows and Mac OSX this test consumes large resources; It takes
Victor Stinner8c663fd2017-11-08 14:44:44 -0800592 # a long time to build the >2 GiB file and takes >2 GiB of disk space
Guido van Rossum34d69e52007-04-10 20:08:41 +0000593 # therefore the resource must be enabled to run this test.
Victor Stinner937ee9e2018-06-26 02:11:06 +0200594 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600595 support.requires(
596 'largefile',
597 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000598 with self.open(support.TESTFN, "w+b", 0) as f:
599 self.large_file_ops(f)
600 with self.open(support.TESTFN, "w+b") as f:
601 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000602
603 def test_with_open(self):
Alexey Izbysheva2670562018-10-20 03:22:31 +0300604 for bufsize in (0, 100):
Guido van Rossum87429772007-04-10 21:06:59 +0000605 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000606 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000607 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000608 self.assertEqual(f.closed, True)
609 f = None
610 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000611 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000612 1/0
613 except ZeroDivisionError:
614 self.assertEqual(f.closed, True)
615 else:
616 self.fail("1/0 didn't raise an exception")
617
Antoine Pitrou08838b62009-01-21 00:55:13 +0000618 # issue 5008
619 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000620 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000621 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000622 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000623 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000624 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000625 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000626 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300627 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000628
Guido van Rossum87429772007-04-10 21:06:59 +0000629 def test_destructor(self):
630 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000631 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000632 def __del__(self):
633 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000634 try:
635 f = super().__del__
636 except AttributeError:
637 pass
638 else:
639 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000640 def close(self):
641 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000642 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000643 def flush(self):
644 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000645 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000646 with support.check_warnings(('', ResourceWarning)):
647 f = MyFileIO(support.TESTFN, "wb")
648 f.write(b"xxx")
649 del f
650 support.gc_collect()
651 self.assertEqual(record, [1, 2, 3])
652 with self.open(support.TESTFN, "rb") as f:
653 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000654
655 def _check_base_destructor(self, base):
656 record = []
657 class MyIO(base):
658 def __init__(self):
659 # This exercises the availability of attributes on object
660 # destruction.
661 # (in the C version, close() is called by the tp_dealloc
662 # function, not by __del__)
663 self.on_del = 1
664 self.on_close = 2
665 self.on_flush = 3
666 def __del__(self):
667 record.append(self.on_del)
668 try:
669 f = super().__del__
670 except AttributeError:
671 pass
672 else:
673 f()
674 def close(self):
675 record.append(self.on_close)
676 super().close()
677 def flush(self):
678 record.append(self.on_flush)
679 super().flush()
680 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000681 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000682 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000683 self.assertEqual(record, [1, 2, 3])
684
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000685 def test_IOBase_destructor(self):
686 self._check_base_destructor(self.IOBase)
687
688 def test_RawIOBase_destructor(self):
689 self._check_base_destructor(self.RawIOBase)
690
691 def test_BufferedIOBase_destructor(self):
692 self._check_base_destructor(self.BufferedIOBase)
693
694 def test_TextIOBase_destructor(self):
695 self._check_base_destructor(self.TextIOBase)
696
Guido van Rossum87429772007-04-10 21:06:59 +0000697 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000698 with self.open(support.TESTFN, "wb") as f:
699 f.write(b"xxx")
700 with self.open(support.TESTFN, "rb") as f:
701 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000702
Guido van Rossumd4103952007-04-12 05:44:49 +0000703 def test_array_writes(self):
704 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000705 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000706 def check(f):
707 with f:
708 self.assertEqual(f.write(a), n)
709 f.writelines((a,))
710 check(self.BytesIO())
711 check(self.FileIO(support.TESTFN, "w"))
712 check(self.BufferedWriter(self.MockRawIO()))
713 check(self.BufferedRandom(self.MockRawIO()))
714 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000715
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000716 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000717 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000718 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000719
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000720 def test_read_closed(self):
721 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000722 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000723 with self.open(support.TESTFN, "r") as f:
724 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000725 self.assertEqual(file.read(), "egg\n")
726 file.seek(0)
727 file.close()
728 self.assertRaises(ValueError, file.read)
729
730 def test_no_closefd_with_filename(self):
731 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000732 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000733
734 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000735 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000736 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000737 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000738 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000739 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000740 self.assertEqual(file.buffer.raw.closefd, False)
741
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000742 def test_garbage_collection(self):
743 # FileIO objects are collected, and collecting them flushes
744 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000745 with support.check_warnings(('', ResourceWarning)):
746 f = self.FileIO(support.TESTFN, "wb")
747 f.write(b"abcxxx")
748 f.f = f
749 wr = weakref.ref(f)
750 del f
751 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300752 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000753 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000754 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000755
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000756 def test_unbounded_file(self):
757 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
758 zero = "/dev/zero"
759 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000760 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000761 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000762 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000763 if support.real_max_memuse < support._2G:
Victor Stinner8c663fd2017-11-08 14:44:44 -0800764 self.skipTest("test requires at least 2 GiB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000765 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000766 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000767 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000768 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000769 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000770 self.assertRaises(OverflowError, f.read)
771
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200772 def check_flush_error_on_close(self, *args, **kwargs):
773 # Test that the file is closed despite failed flush
774 # and that flush() is called before file closed.
775 f = self.open(*args, **kwargs)
776 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000777 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200778 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200779 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000780 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200781 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600782 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200783 self.assertTrue(closed) # flush() called
784 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200785 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200786
787 def test_flush_error_on_close(self):
788 # raw file
789 # Issue #5700: io.FileIO calls flush() after file closed
790 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
791 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
792 self.check_flush_error_on_close(fd, 'wb', buffering=0)
793 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
794 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
795 os.close(fd)
796 # buffered io
797 self.check_flush_error_on_close(support.TESTFN, 'wb')
798 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
799 self.check_flush_error_on_close(fd, 'wb')
800 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
801 self.check_flush_error_on_close(fd, 'wb', closefd=False)
802 os.close(fd)
803 # text io
804 self.check_flush_error_on_close(support.TESTFN, 'w')
805 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
806 self.check_flush_error_on_close(fd, 'w')
807 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
808 self.check_flush_error_on_close(fd, 'w', closefd=False)
809 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000810
811 def test_multi_close(self):
812 f = self.open(support.TESTFN, "wb", buffering=0)
813 f.close()
814 f.close()
815 f.close()
816 self.assertRaises(ValueError, f.flush)
817
Antoine Pitrou328ec742010-09-14 18:37:24 +0000818 def test_RawIOBase_read(self):
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530819 # Exercise the default limited RawIOBase.read(n) implementation (which
820 # calls readinto() internally).
Antoine Pitrou328ec742010-09-14 18:37:24 +0000821 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
822 self.assertEqual(rawio.read(2), b"ab")
823 self.assertEqual(rawio.read(2), b"c")
824 self.assertEqual(rawio.read(2), b"d")
825 self.assertEqual(rawio.read(2), None)
826 self.assertEqual(rawio.read(2), b"ef")
827 self.assertEqual(rawio.read(2), b"g")
828 self.assertEqual(rawio.read(2), None)
829 self.assertEqual(rawio.read(2), b"")
830
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400831 def test_types_have_dict(self):
832 test = (
833 self.IOBase(),
834 self.RawIOBase(),
835 self.TextIOBase(),
836 self.StringIO(),
837 self.BytesIO()
838 )
839 for obj in test:
840 self.assertTrue(hasattr(obj, "__dict__"))
841
Ross Lagerwall59142db2011-10-31 20:34:46 +0200842 def test_opener(self):
843 with self.open(support.TESTFN, "w") as f:
844 f.write("egg\n")
845 fd = os.open(support.TESTFN, os.O_RDONLY)
846 def opener(path, flags):
847 return fd
848 with self.open("non-existent", "r", opener=opener) as f:
849 self.assertEqual(f.read(), "egg\n")
850
Barry Warsaw480e2852016-06-08 17:47:26 -0400851 def test_bad_opener_negative_1(self):
852 # Issue #27066.
853 def badopener(fname, flags):
854 return -1
855 with self.assertRaises(ValueError) as cm:
856 open('non-existent', 'r', opener=badopener)
857 self.assertEqual(str(cm.exception), 'opener returned -1')
858
859 def test_bad_opener_other_negative(self):
860 # Issue #27066.
861 def badopener(fname, flags):
862 return -2
863 with self.assertRaises(ValueError) as cm:
864 open('non-existent', 'r', opener=badopener)
865 self.assertEqual(str(cm.exception), 'opener returned -2')
866
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200867 def test_fileio_closefd(self):
868 # Issue #4841
869 with self.open(__file__, 'rb') as f1, \
870 self.open(__file__, 'rb') as f2:
871 fileio = self.FileIO(f1.fileno(), closefd=False)
872 # .__init__() must not close f1
873 fileio.__init__(f2.fileno(), closefd=False)
874 f1.readline()
875 # .close() must not close f2
876 fileio.close()
877 f2.readline()
878
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300879 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200880 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300881 with self.assertRaises(ValueError):
882 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300883
884 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200885 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300886 with self.assertRaises(ValueError):
887 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300888
Martin Panter6bb91f32016-05-28 00:41:57 +0000889 def test_buffered_readinto_mixin(self):
890 # Test the implementation provided by BufferedIOBase
891 class Stream(self.BufferedIOBase):
892 def read(self, size):
893 return b"12345"
894 read1 = read
895 stream = Stream()
896 for method in ("readinto", "readinto1"):
897 with self.subTest(method):
898 buffer = byteslike(5)
899 self.assertEqual(getattr(stream, method)(buffer), 5)
900 self.assertEqual(bytes(buffer), b"12345")
901
Ethan Furmand62548a2016-06-04 14:38:43 -0700902 def test_fspath_support(self):
Ethan Furmand62548a2016-06-04 14:38:43 -0700903 def check_path_succeeds(path):
904 with self.open(path, "w") as f:
905 f.write("egg\n")
906
907 with self.open(path, "r") as f:
908 self.assertEqual(f.read(), "egg\n")
909
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200910 check_path_succeeds(FakePath(support.TESTFN))
911 check_path_succeeds(FakePath(support.TESTFN.encode('utf-8')))
Ethan Furmand62548a2016-06-04 14:38:43 -0700912
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200913 with self.open(support.TESTFN, "w") as f:
914 bad_path = FakePath(f.fileno())
915 with self.assertRaises(TypeError):
916 self.open(bad_path, 'w')
917
918 bad_path = FakePath(None)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700919 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700920 self.open(bad_path, 'w')
921
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200922 bad_path = FakePath(FloatingPointError)
923 with self.assertRaises(FloatingPointError):
924 self.open(bad_path, 'w')
925
Ethan Furmand62548a2016-06-04 14:38:43 -0700926 # ensure that refcounting is correct with some error conditions
927 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200928 self.open(FakePath(support.TESTFN), 'rwxa')
Ethan Furmand62548a2016-06-04 14:38:43 -0700929
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530930 def test_RawIOBase_readall(self):
931 # Exercise the default unlimited RawIOBase.read() and readall()
932 # implementations.
933 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
934 self.assertEqual(rawio.read(), b"abcdefg")
935 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
936 self.assertEqual(rawio.readall(), b"abcdefg")
937
938 def test_BufferedIOBase_readinto(self):
939 # Exercise the default BufferedIOBase.readinto() and readinto1()
940 # implementations (which call read() or read1() internally).
941 class Reader(self.BufferedIOBase):
942 def __init__(self, avail):
943 self.avail = avail
944 def read(self, size):
945 result = self.avail[:size]
946 self.avail = self.avail[size:]
947 return result
948 def read1(self, size):
949 """Returns no more than 5 bytes at once"""
950 return self.read(min(size, 5))
951 tests = (
952 # (test method, total data available, read buffer size, expected
953 # read size)
954 ("readinto", 10, 5, 5),
955 ("readinto", 10, 6, 6), # More than read1() can return
956 ("readinto", 5, 6, 5), # Buffer larger than total available
957 ("readinto", 6, 7, 6),
958 ("readinto", 10, 0, 0), # Empty buffer
959 ("readinto1", 10, 5, 5), # Result limited to single read1() call
960 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return
961 ("readinto1", 5, 6, 5), # Buffer larger than total available
962 ("readinto1", 6, 7, 5),
963 ("readinto1", 10, 0, 0), # Empty buffer
964 )
965 UNUSED_BYTE = 0x81
966 for test in tests:
967 with self.subTest(test):
968 method, avail, request, result = test
969 reader = Reader(bytes(range(avail)))
970 buffer = bytearray((UNUSED_BYTE,) * request)
971 method = getattr(reader, method)
972 self.assertEqual(method(buffer), result)
973 self.assertEqual(len(buffer), request)
974 self.assertSequenceEqual(buffer[:result], range(result))
975 unused = (UNUSED_BYTE,) * (request - result)
976 self.assertSequenceEqual(buffer[result:], unused)
977 self.assertEqual(len(reader.avail), avail - result)
978
Zackery Spytz28f07362018-07-17 00:31:44 -0600979 def test_close_assert(self):
980 class R(self.IOBase):
981 def __setattr__(self, name, value):
982 pass
983 def flush(self):
984 raise OSError()
985 f = R()
986 # This would cause an assertion failure.
987 self.assertRaises(OSError, f.close)
988
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200989
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000990class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200991
992 def test_IOBase_finalize(self):
993 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
994 # class which inherits IOBase and an object of this class are caught
995 # in a reference cycle and close() is already in the method cache.
996 class MyIO(self.IOBase):
997 def close(self):
998 pass
999
1000 # create an instance to populate the method cache
1001 MyIO()
1002 obj = MyIO()
1003 obj.obj = obj
1004 wr = weakref.ref(obj)
1005 del MyIO
1006 del obj
1007 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001008 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +00001009
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001010class PyIOTest(IOTest):
1011 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001012
Guido van Rossuma9e20242007-03-08 00:43:48 +00001013
Gregory P. Smith1bef9072015-04-14 13:24:34 -07001014@support.cpython_only
1015class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -07001016
Gregory P. Smith054b0652015-04-14 12:58:05 -07001017 def test_RawIOBase_io_in_pyio_match(self):
1018 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +02001019 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
1020 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -07001021 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1022
1023 def test_RawIOBase_pyio_in_io_match(self):
1024 """Test that c RawIOBase class has all pyio RawIOBase methods"""
1025 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1026 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1027
1028
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001029class CommonBufferedTests:
1030 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1031
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001032 def test_detach(self):
1033 raw = self.MockRawIO()
1034 buf = self.tp(raw)
1035 self.assertIs(buf.detach(), raw)
1036 self.assertRaises(ValueError, buf.detach)
1037
Benjamin Peterson10e76b62014-12-21 20:51:50 -06001038 repr(buf) # Should still work
1039
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001040 def test_fileno(self):
1041 rawio = self.MockRawIO()
1042 bufio = self.tp(rawio)
1043
Ezio Melottib3aedd42010-11-20 19:04:17 +00001044 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001045
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001046 def test_invalid_args(self):
1047 rawio = self.MockRawIO()
1048 bufio = self.tp(rawio)
1049 # Invalid whence
1050 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +02001051 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001052
1053 def test_override_destructor(self):
1054 tp = self.tp
1055 record = []
1056 class MyBufferedIO(tp):
1057 def __del__(self):
1058 record.append(1)
1059 try:
1060 f = super().__del__
1061 except AttributeError:
1062 pass
1063 else:
1064 f()
1065 def close(self):
1066 record.append(2)
1067 super().close()
1068 def flush(self):
1069 record.append(3)
1070 super().flush()
1071 rawio = self.MockRawIO()
1072 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001073 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001074 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +00001075 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001076
1077 def test_context_manager(self):
1078 # Test usability as a context manager
1079 rawio = self.MockRawIO()
1080 bufio = self.tp(rawio)
1081 def _with():
1082 with bufio:
1083 pass
1084 _with()
1085 # bufio should now be closed, and using it a second time should raise
1086 # a ValueError.
1087 self.assertRaises(ValueError, _with)
1088
1089 def test_error_through_destructor(self):
1090 # Test that the exception state is not modified by a destructor,
1091 # even if close() fails.
1092 rawio = self.CloseFailureIO()
1093 def f():
1094 self.tp(rawio).xyzzy
1095 with support.captured_output("stderr") as s:
1096 self.assertRaises(AttributeError, f)
1097 s = s.getvalue().strip()
1098 if s:
1099 # The destructor *may* have printed an unraisable error, check it
1100 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001101 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001102 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +00001103
Antoine Pitrou716c4442009-05-23 19:04:03 +00001104 def test_repr(self):
1105 raw = self.MockRawIO()
1106 b = self.tp(raw)
Serhiy Storchaka521e5862014-07-22 15:00:37 +03001107 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001108 self.assertEqual(repr(b), "<%s>" % clsname)
1109 raw.name = "dummy"
1110 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
1111 raw.name = b"dummy"
1112 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
1113
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02001114 def test_recursive_repr(self):
1115 # Issue #25455
1116 raw = self.MockRawIO()
1117 b = self.tp(raw)
1118 with support.swap_attr(raw, 'name', b):
1119 try:
1120 repr(b) # Should not crash
1121 except RuntimeError:
1122 pass
1123
Antoine Pitrou6be88762010-05-03 16:48:20 +00001124 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001125 # Test that buffered file is closed despite failed flush
1126 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001127 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001128 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001129 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001130 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001131 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001132 raw.flush = bad_flush
1133 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001134 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001135 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001136 self.assertTrue(raw.closed)
1137 self.assertTrue(closed) # flush() called
1138 self.assertFalse(closed[0]) # flush() called before file closed
1139 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001140 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001141
1142 def test_close_error_on_close(self):
1143 raw = self.MockRawIO()
1144 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001145 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001146 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001147 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001148 raw.close = bad_close
1149 b = self.tp(raw)
1150 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001151 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001152 b.close()
1153 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001154 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001155 self.assertEqual(err.exception.__context__.args, ('flush',))
1156 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001157
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001158 def test_nonnormalized_close_error_on_close(self):
1159 # Issue #21677
1160 raw = self.MockRawIO()
1161 def bad_flush():
1162 raise non_existing_flush
1163 def bad_close():
1164 raise non_existing_close
1165 raw.close = bad_close
1166 b = self.tp(raw)
1167 b.flush = bad_flush
1168 with self.assertRaises(NameError) as err: # exception not swallowed
1169 b.close()
1170 self.assertIn('non_existing_close', str(err.exception))
1171 self.assertIsInstance(err.exception.__context__, NameError)
1172 self.assertIn('non_existing_flush', str(err.exception.__context__))
1173 self.assertFalse(b.closed)
1174
Antoine Pitrou6be88762010-05-03 16:48:20 +00001175 def test_multi_close(self):
1176 raw = self.MockRawIO()
1177 b = self.tp(raw)
1178 b.close()
1179 b.close()
1180 b.close()
1181 self.assertRaises(ValueError, b.flush)
1182
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001183 def test_unseekable(self):
1184 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1185 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1186 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1187
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001188 def test_readonly_attributes(self):
1189 raw = self.MockRawIO()
1190 buf = self.tp(raw)
1191 x = self.MockRawIO()
1192 with self.assertRaises(AttributeError):
1193 buf.raw = x
1194
Guido van Rossum78892e42007-04-06 17:31:18 +00001195
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001196class SizeofTest:
1197
1198 @support.cpython_only
1199 def test_sizeof(self):
1200 bufsize1 = 4096
1201 bufsize2 = 8192
1202 rawio = self.MockRawIO()
1203 bufio = self.tp(rawio, buffer_size=bufsize1)
1204 size = sys.getsizeof(bufio) - bufsize1
1205 rawio = self.MockRawIO()
1206 bufio = self.tp(rawio, buffer_size=bufsize2)
1207 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1208
Jesus Ceadc469452012-10-04 12:37:56 +02001209 @support.cpython_only
1210 def test_buffer_freeing(self) :
1211 bufsize = 4096
1212 rawio = self.MockRawIO()
1213 bufio = self.tp(rawio, buffer_size=bufsize)
1214 size = sys.getsizeof(bufio) - bufsize
1215 bufio.close()
1216 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001217
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001218class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1219 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001220
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001221 def test_constructor(self):
1222 rawio = self.MockRawIO([b"abc"])
1223 bufio = self.tp(rawio)
1224 bufio.__init__(rawio)
1225 bufio.__init__(rawio, buffer_size=1024)
1226 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001227 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001228 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1229 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1230 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1231 rawio = self.MockRawIO([b"abc"])
1232 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001233 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001234
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001235 def test_uninitialized(self):
1236 bufio = self.tp.__new__(self.tp)
1237 del bufio
1238 bufio = self.tp.__new__(self.tp)
1239 self.assertRaisesRegex((ValueError, AttributeError),
1240 'uninitialized|has no attribute',
1241 bufio.read, 0)
1242 bufio.__init__(self.MockRawIO())
1243 self.assertEqual(bufio.read(0), b'')
1244
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001245 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001246 for arg in (None, 7):
1247 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1248 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001249 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001250 # Invalid args
1251 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001252
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001253 def test_read1(self):
1254 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1255 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001256 self.assertEqual(b"a", bufio.read(1))
1257 self.assertEqual(b"b", bufio.read1(1))
1258 self.assertEqual(rawio._reads, 1)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001259 self.assertEqual(b"", bufio.read1(0))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001260 self.assertEqual(b"c", bufio.read1(100))
1261 self.assertEqual(rawio._reads, 1)
1262 self.assertEqual(b"d", bufio.read1(100))
1263 self.assertEqual(rawio._reads, 2)
1264 self.assertEqual(b"efg", bufio.read1(100))
1265 self.assertEqual(rawio._reads, 3)
1266 self.assertEqual(b"", bufio.read1(100))
1267 self.assertEqual(rawio._reads, 4)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001268
1269 def test_read1_arbitrary(self):
1270 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1271 bufio = self.tp(rawio)
1272 self.assertEqual(b"a", bufio.read(1))
1273 self.assertEqual(b"bc", bufio.read1())
1274 self.assertEqual(b"d", bufio.read1())
1275 self.assertEqual(b"efg", bufio.read1(-1))
1276 self.assertEqual(rawio._reads, 3)
1277 self.assertEqual(b"", bufio.read1())
1278 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001279
1280 def test_readinto(self):
1281 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1282 bufio = self.tp(rawio)
1283 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001284 self.assertEqual(bufio.readinto(b), 2)
1285 self.assertEqual(b, b"ab")
1286 self.assertEqual(bufio.readinto(b), 2)
1287 self.assertEqual(b, b"cd")
1288 self.assertEqual(bufio.readinto(b), 2)
1289 self.assertEqual(b, b"ef")
1290 self.assertEqual(bufio.readinto(b), 1)
1291 self.assertEqual(b, b"gf")
1292 self.assertEqual(bufio.readinto(b), 0)
1293 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001294 rawio = self.MockRawIO((b"abc", None))
1295 bufio = self.tp(rawio)
1296 self.assertEqual(bufio.readinto(b), 2)
1297 self.assertEqual(b, b"ab")
1298 self.assertEqual(bufio.readinto(b), 1)
1299 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001300
Benjamin Petersona96fea02014-06-22 14:17:44 -07001301 def test_readinto1(self):
1302 buffer_size = 10
1303 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1304 bufio = self.tp(rawio, buffer_size=buffer_size)
1305 b = bytearray(2)
1306 self.assertEqual(bufio.peek(3), b'abc')
1307 self.assertEqual(rawio._reads, 1)
1308 self.assertEqual(bufio.readinto1(b), 2)
1309 self.assertEqual(b, b"ab")
1310 self.assertEqual(rawio._reads, 1)
1311 self.assertEqual(bufio.readinto1(b), 1)
1312 self.assertEqual(b[:1], b"c")
1313 self.assertEqual(rawio._reads, 1)
1314 self.assertEqual(bufio.readinto1(b), 2)
1315 self.assertEqual(b, b"de")
1316 self.assertEqual(rawio._reads, 2)
1317 b = bytearray(2*buffer_size)
1318 self.assertEqual(bufio.peek(3), b'fgh')
1319 self.assertEqual(rawio._reads, 3)
1320 self.assertEqual(bufio.readinto1(b), 6)
1321 self.assertEqual(b[:6], b"fghjkl")
1322 self.assertEqual(rawio._reads, 4)
1323
1324 def test_readinto_array(self):
1325 buffer_size = 60
1326 data = b"a" * 26
1327 rawio = self.MockRawIO((data,))
1328 bufio = self.tp(rawio, buffer_size=buffer_size)
1329
1330 # Create an array with element size > 1 byte
1331 b = array.array('i', b'x' * 32)
1332 assert len(b) != 16
1333
1334 # Read into it. We should get as many *bytes* as we can fit into b
1335 # (which is more than the number of elements)
1336 n = bufio.readinto(b)
1337 self.assertGreater(n, len(b))
1338
1339 # Check that old contents of b are preserved
1340 bm = memoryview(b).cast('B')
1341 self.assertLess(n, len(bm))
1342 self.assertEqual(bm[:n], data[:n])
1343 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1344
1345 def test_readinto1_array(self):
1346 buffer_size = 60
1347 data = b"a" * 26
1348 rawio = self.MockRawIO((data,))
1349 bufio = self.tp(rawio, buffer_size=buffer_size)
1350
1351 # Create an array with element size > 1 byte
1352 b = array.array('i', b'x' * 32)
1353 assert len(b) != 16
1354
1355 # Read into it. We should get as many *bytes* as we can fit into b
1356 # (which is more than the number of elements)
1357 n = bufio.readinto1(b)
1358 self.assertGreater(n, len(b))
1359
1360 # Check that old contents of b are preserved
1361 bm = memoryview(b).cast('B')
1362 self.assertLess(n, len(bm))
1363 self.assertEqual(bm[:n], data[:n])
1364 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1365
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001366 def test_readlines(self):
1367 def bufio():
1368 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1369 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001370 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1371 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1372 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001373
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001374 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001375 data = b"abcdefghi"
1376 dlen = len(data)
1377
1378 tests = [
1379 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1380 [ 100, [ 3, 3, 3], [ dlen ] ],
1381 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1382 ]
1383
1384 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001385 rawio = self.MockFileIO(data)
1386 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001387 pos = 0
1388 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001389 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001390 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001391 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001392 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001393
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001394 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001395 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001396 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1397 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001398 self.assertEqual(b"abcd", bufio.read(6))
1399 self.assertEqual(b"e", bufio.read(1))
1400 self.assertEqual(b"fg", bufio.read())
1401 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001402 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001403 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001404
Victor Stinnera80987f2011-05-25 22:47:16 +02001405 rawio = self.MockRawIO((b"a", None, None))
1406 self.assertEqual(b"a", rawio.readall())
1407 self.assertIsNone(rawio.readall())
1408
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001409 def test_read_past_eof(self):
1410 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1411 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001412
Ezio Melottib3aedd42010-11-20 19:04:17 +00001413 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001414
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001415 def test_read_all(self):
1416 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1417 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001418
Ezio Melottib3aedd42010-11-20 19:04:17 +00001419 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001420
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001421 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001422 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001423 try:
1424 # Write out many bytes with exactly the same number of 0's,
1425 # 1's... 255's. This will help us check that concurrent reading
1426 # doesn't duplicate or forget contents.
1427 N = 1000
1428 l = list(range(256)) * N
1429 random.shuffle(l)
1430 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001431 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001432 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001433 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001434 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001435 errors = []
1436 results = []
1437 def f():
1438 try:
1439 # Intra-buffer read then buffer-flushing read
1440 for n in cycle([1, 19]):
1441 s = bufio.read(n)
1442 if not s:
1443 break
1444 # list.append() is atomic
1445 results.append(s)
1446 except Exception as e:
1447 errors.append(e)
1448 raise
1449 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001450 with support.start_threads(threads):
1451 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001452 self.assertFalse(errors,
1453 "the following exceptions were caught: %r" % errors)
1454 s = b''.join(results)
1455 for i in range(256):
1456 c = bytes(bytearray([i]))
1457 self.assertEqual(s.count(c), N)
1458 finally:
1459 support.unlink(support.TESTFN)
1460
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001461 def test_unseekable(self):
1462 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1463 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1464 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1465 bufio.read(1)
1466 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1467 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1468
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001469 def test_misbehaved_io(self):
1470 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1471 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001472 self.assertRaises(OSError, bufio.seek, 0)
1473 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001474
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001475 def test_no_extraneous_read(self):
1476 # Issue #9550; when the raw IO object has satisfied the read request,
1477 # we should not issue any additional reads, otherwise it may block
1478 # (e.g. socket).
1479 bufsize = 16
1480 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1481 rawio = self.MockRawIO([b"x" * n])
1482 bufio = self.tp(rawio, bufsize)
1483 self.assertEqual(bufio.read(n), b"x" * n)
1484 # Simple case: one raw read is enough to satisfy the request.
1485 self.assertEqual(rawio._extraneous_reads, 0,
1486 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1487 # A more complex case where two raw reads are needed to satisfy
1488 # the request.
1489 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1490 bufio = self.tp(rawio, bufsize)
1491 self.assertEqual(bufio.read(n), b"x" * n)
1492 self.assertEqual(rawio._extraneous_reads, 0,
1493 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1494
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001495 def test_read_on_closed(self):
1496 # Issue #23796
1497 b = io.BufferedReader(io.BytesIO(b"12"))
1498 b.read(1)
1499 b.close()
1500 self.assertRaises(ValueError, b.peek)
1501 self.assertRaises(ValueError, b.read1, 1)
1502
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001503
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001504class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001505 tp = io.BufferedReader
1506
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001507 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1508 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001509 def test_constructor(self):
1510 BufferedReaderTest.test_constructor(self)
1511 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001512 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001513 if sys.maxsize > 0x7FFFFFFF:
1514 rawio = self.MockRawIO()
1515 bufio = self.tp(rawio)
1516 self.assertRaises((OverflowError, MemoryError, ValueError),
1517 bufio.__init__, rawio, sys.maxsize)
1518
1519 def test_initialization(self):
1520 rawio = self.MockRawIO([b"abc"])
1521 bufio = self.tp(rawio)
1522 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1523 self.assertRaises(ValueError, bufio.read)
1524 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1525 self.assertRaises(ValueError, bufio.read)
1526 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1527 self.assertRaises(ValueError, bufio.read)
1528
1529 def test_misbehaved_io_read(self):
1530 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1531 bufio = self.tp(rawio)
1532 # _pyio.BufferedReader seems to implement reading different, so that
1533 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001534 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001535
1536 def test_garbage_collection(self):
1537 # C BufferedReader objects are collected.
1538 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001539 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou796564c2013-07-30 19:59:21 +02001540 with support.check_warnings(('', ResourceWarning)):
1541 rawio = self.FileIO(support.TESTFN, "w+b")
1542 f = self.tp(rawio)
1543 f.f = f
1544 wr = weakref.ref(f)
1545 del f
1546 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001547 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001548
R David Murray67bfe802013-02-23 21:51:05 -05001549 def test_args_error(self):
1550 # Issue #17275
1551 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1552 self.tp(io.BytesIO(), 1024, 1024, 1024)
1553
1554
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001555class PyBufferedReaderTest(BufferedReaderTest):
1556 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001557
Guido van Rossuma9e20242007-03-08 00:43:48 +00001558
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001559class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1560 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001561
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001562 def test_constructor(self):
1563 rawio = self.MockRawIO()
1564 bufio = self.tp(rawio)
1565 bufio.__init__(rawio)
1566 bufio.__init__(rawio, buffer_size=1024)
1567 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001568 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001569 bufio.flush()
1570 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1571 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1572 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1573 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001574 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001575 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001576 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001577
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001578 def test_uninitialized(self):
1579 bufio = self.tp.__new__(self.tp)
1580 del bufio
1581 bufio = self.tp.__new__(self.tp)
1582 self.assertRaisesRegex((ValueError, AttributeError),
1583 'uninitialized|has no attribute',
1584 bufio.write, b'')
1585 bufio.__init__(self.MockRawIO())
1586 self.assertEqual(bufio.write(b''), 0)
1587
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001588 def test_detach_flush(self):
1589 raw = self.MockRawIO()
1590 buf = self.tp(raw)
1591 buf.write(b"howdy!")
1592 self.assertFalse(raw._write_stack)
1593 buf.detach()
1594 self.assertEqual(raw._write_stack, [b"howdy!"])
1595
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001596 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001597 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001598 writer = self.MockRawIO()
1599 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001600 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001601 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001602 buffer = bytearray(b"def")
1603 bufio.write(buffer)
1604 buffer[:] = b"***" # Overwrite our copy of the data
1605 bufio.flush()
1606 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001607
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001608 def test_write_overflow(self):
1609 writer = self.MockRawIO()
1610 bufio = self.tp(writer, 8)
1611 contents = b"abcdefghijklmnop"
1612 for n in range(0, len(contents), 3):
1613 bufio.write(contents[n:n+3])
1614 flushed = b"".join(writer._write_stack)
1615 # At least (total - 8) bytes were implicitly flushed, perhaps more
1616 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001617 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001618
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001619 def check_writes(self, intermediate_func):
1620 # Lots of writes, test the flushed output is as expected.
1621 contents = bytes(range(256)) * 1000
1622 n = 0
1623 writer = self.MockRawIO()
1624 bufio = self.tp(writer, 13)
1625 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1626 def gen_sizes():
1627 for size in count(1):
1628 for i in range(15):
1629 yield size
1630 sizes = gen_sizes()
1631 while n < len(contents):
1632 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001633 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001634 intermediate_func(bufio)
1635 n += size
1636 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001637 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001638
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001639 def test_writes(self):
1640 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001641
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001642 def test_writes_and_flushes(self):
1643 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001644
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001645 def test_writes_and_seeks(self):
1646 def _seekabs(bufio):
1647 pos = bufio.tell()
1648 bufio.seek(pos + 1, 0)
1649 bufio.seek(pos - 1, 0)
1650 bufio.seek(pos, 0)
1651 self.check_writes(_seekabs)
1652 def _seekrel(bufio):
1653 pos = bufio.seek(0, 1)
1654 bufio.seek(+1, 1)
1655 bufio.seek(-1, 1)
1656 bufio.seek(pos, 0)
1657 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001658
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001659 def test_writes_and_truncates(self):
1660 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001661
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001662 def test_write_non_blocking(self):
1663 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001664 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001665
Ezio Melottib3aedd42010-11-20 19:04:17 +00001666 self.assertEqual(bufio.write(b"abcd"), 4)
1667 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001668 # 1 byte will be written, the rest will be buffered
1669 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001670 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001671
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001672 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1673 raw.block_on(b"0")
1674 try:
1675 bufio.write(b"opqrwxyz0123456789")
1676 except self.BlockingIOError as e:
1677 written = e.characters_written
1678 else:
1679 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001680 self.assertEqual(written, 16)
1681 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001682 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001683
Ezio Melottib3aedd42010-11-20 19:04:17 +00001684 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001685 s = raw.pop_written()
1686 # Previously buffered bytes were flushed
1687 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001688
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001689 def test_write_and_rewind(self):
1690 raw = io.BytesIO()
1691 bufio = self.tp(raw, 4)
1692 self.assertEqual(bufio.write(b"abcdef"), 6)
1693 self.assertEqual(bufio.tell(), 6)
1694 bufio.seek(0, 0)
1695 self.assertEqual(bufio.write(b"XY"), 2)
1696 bufio.seek(6, 0)
1697 self.assertEqual(raw.getvalue(), b"XYcdef")
1698 self.assertEqual(bufio.write(b"123456"), 6)
1699 bufio.flush()
1700 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001701
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001702 def test_flush(self):
1703 writer = self.MockRawIO()
1704 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001705 bufio.write(b"abc")
1706 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001707 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001708
Antoine Pitrou131a4892012-10-16 22:57:11 +02001709 def test_writelines(self):
1710 l = [b'ab', b'cd', b'ef']
1711 writer = self.MockRawIO()
1712 bufio = self.tp(writer, 8)
1713 bufio.writelines(l)
1714 bufio.flush()
1715 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1716
1717 def test_writelines_userlist(self):
1718 l = UserList([b'ab', b'cd', b'ef'])
1719 writer = self.MockRawIO()
1720 bufio = self.tp(writer, 8)
1721 bufio.writelines(l)
1722 bufio.flush()
1723 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1724
1725 def test_writelines_error(self):
1726 writer = self.MockRawIO()
1727 bufio = self.tp(writer, 8)
1728 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1729 self.assertRaises(TypeError, bufio.writelines, None)
1730 self.assertRaises(TypeError, bufio.writelines, 'abc')
1731
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001732 def test_destructor(self):
1733 writer = self.MockRawIO()
1734 bufio = self.tp(writer, 8)
1735 bufio.write(b"abc")
1736 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001737 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001738 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001739
1740 def test_truncate(self):
1741 # Truncate implicitly flushes the buffer.
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001742 self.addCleanup(support.unlink, support.TESTFN)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001743 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001744 bufio = self.tp(raw, 8)
1745 bufio.write(b"abcdef")
1746 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001747 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001748 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001749 self.assertEqual(f.read(), b"abc")
1750
Nitish Chandra059f58c2018-01-28 21:30:09 +05301751 def test_truncate_after_write(self):
1752 # Ensure that truncate preserves the file position after
1753 # writes longer than the buffer size.
1754 # Issue: https://bugs.python.org/issue32228
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001755 self.addCleanup(support.unlink, support.TESTFN)
Nitish Chandra059f58c2018-01-28 21:30:09 +05301756 with self.open(support.TESTFN, "wb") as f:
1757 # Fill with some buffer
1758 f.write(b'\x00' * 10000)
1759 buffer_sizes = [8192, 4096, 200]
1760 for buffer_size in buffer_sizes:
1761 with self.open(support.TESTFN, "r+b", buffering=buffer_size) as f:
1762 f.write(b'\x00' * (buffer_size + 1))
1763 # After write write_pos and write_end are set to 0
1764 f.read(1)
1765 # read operation makes sure that pos != raw_pos
1766 f.truncate()
1767 self.assertEqual(f.tell(), buffer_size + 2)
1768
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001769 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001770 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001771 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001772 # Write out many bytes from many threads and test they were
1773 # all flushed.
1774 N = 1000
1775 contents = bytes(range(256)) * N
1776 sizes = cycle([1, 19])
1777 n = 0
1778 queue = deque()
1779 while n < len(contents):
1780 size = next(sizes)
1781 queue.append(contents[n:n+size])
1782 n += size
1783 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001784 # We use a real file object because it allows us to
1785 # exercise situations where the GIL is released before
1786 # writing the buffer to the raw streams. This is in addition
1787 # to concurrency issues due to switching threads in the middle
1788 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001789 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001790 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001791 errors = []
1792 def f():
1793 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001794 while True:
1795 try:
1796 s = queue.popleft()
1797 except IndexError:
1798 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001799 bufio.write(s)
1800 except Exception as e:
1801 errors.append(e)
1802 raise
1803 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001804 with support.start_threads(threads):
1805 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001806 self.assertFalse(errors,
1807 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001808 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001809 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001810 s = f.read()
1811 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001812 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001813 finally:
1814 support.unlink(support.TESTFN)
1815
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001816 def test_misbehaved_io(self):
1817 rawio = self.MisbehavedRawIO()
1818 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001819 self.assertRaises(OSError, bufio.seek, 0)
1820 self.assertRaises(OSError, bufio.tell)
1821 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001822
Florent Xicluna109d5732012-07-07 17:03:22 +02001823 def test_max_buffer_size_removal(self):
1824 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001825 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001826
Benjamin Peterson68623612012-12-20 11:53:11 -06001827 def test_write_error_on_close(self):
1828 raw = self.MockRawIO()
1829 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001830 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001831 raw.write = bad_write
1832 b = self.tp(raw)
1833 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001834 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001835 self.assertTrue(b.closed)
1836
benfogle9703f092017-11-10 16:03:40 -05001837 def test_slow_close_from_thread(self):
1838 # Issue #31976
1839 rawio = self.SlowFlushRawIO()
1840 bufio = self.tp(rawio, 8)
1841 t = threading.Thread(target=bufio.close)
1842 t.start()
1843 rawio.in_flush.wait()
1844 self.assertRaises(ValueError, bufio.write, b'spam')
1845 self.assertTrue(bufio.closed)
1846 t.join()
1847
1848
Benjamin Peterson59406a92009-03-26 17:10:29 +00001849
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001850class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001851 tp = io.BufferedWriter
1852
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001853 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1854 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001855 def test_constructor(self):
1856 BufferedWriterTest.test_constructor(self)
1857 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001858 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001859 if sys.maxsize > 0x7FFFFFFF:
1860 rawio = self.MockRawIO()
1861 bufio = self.tp(rawio)
1862 self.assertRaises((OverflowError, MemoryError, ValueError),
1863 bufio.__init__, rawio, sys.maxsize)
1864
1865 def test_initialization(self):
1866 rawio = self.MockRawIO()
1867 bufio = self.tp(rawio)
1868 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1869 self.assertRaises(ValueError, bufio.write, b"def")
1870 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1871 self.assertRaises(ValueError, bufio.write, b"def")
1872 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1873 self.assertRaises(ValueError, bufio.write, b"def")
1874
1875 def test_garbage_collection(self):
1876 # C BufferedWriter objects are collected, and collecting them flushes
1877 # all data to disk.
1878 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001879 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou796564c2013-07-30 19:59:21 +02001880 with support.check_warnings(('', ResourceWarning)):
1881 rawio = self.FileIO(support.TESTFN, "w+b")
1882 f = self.tp(rawio)
1883 f.write(b"123xxx")
1884 f.x = f
1885 wr = weakref.ref(f)
1886 del f
1887 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001888 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001889 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001890 self.assertEqual(f.read(), b"123xxx")
1891
R David Murray67bfe802013-02-23 21:51:05 -05001892 def test_args_error(self):
1893 # Issue #17275
1894 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1895 self.tp(io.BytesIO(), 1024, 1024, 1024)
1896
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001897
1898class PyBufferedWriterTest(BufferedWriterTest):
1899 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001900
Guido van Rossum01a27522007-03-07 01:00:12 +00001901class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001902
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001903 def test_constructor(self):
1904 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001905 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001906
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001907 def test_uninitialized(self):
1908 pair = self.tp.__new__(self.tp)
1909 del pair
1910 pair = self.tp.__new__(self.tp)
1911 self.assertRaisesRegex((ValueError, AttributeError),
1912 'uninitialized|has no attribute',
1913 pair.read, 0)
1914 self.assertRaisesRegex((ValueError, AttributeError),
1915 'uninitialized|has no attribute',
1916 pair.write, b'')
1917 pair.__init__(self.MockRawIO(), self.MockRawIO())
1918 self.assertEqual(pair.read(0), b'')
1919 self.assertEqual(pair.write(b''), 0)
1920
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001921 def test_detach(self):
1922 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1923 self.assertRaises(self.UnsupportedOperation, pair.detach)
1924
Florent Xicluna109d5732012-07-07 17:03:22 +02001925 def test_constructor_max_buffer_size_removal(self):
1926 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001927 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001928
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001929 def test_constructor_with_not_readable(self):
1930 class NotReadable(MockRawIO):
1931 def readable(self):
1932 return False
1933
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001934 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001935
1936 def test_constructor_with_not_writeable(self):
1937 class NotWriteable(MockRawIO):
1938 def writable(self):
1939 return False
1940
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001941 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001942
1943 def test_read(self):
1944 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1945
1946 self.assertEqual(pair.read(3), b"abc")
1947 self.assertEqual(pair.read(1), b"d")
1948 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001949 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1950 self.assertEqual(pair.read(None), b"abc")
1951
1952 def test_readlines(self):
1953 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1954 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1955 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1956 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001957
1958 def test_read1(self):
1959 # .read1() is delegated to the underlying reader object, so this test
1960 # can be shallow.
1961 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1962
1963 self.assertEqual(pair.read1(3), b"abc")
Martin Panterccb2c0e2016-10-20 23:48:14 +00001964 self.assertEqual(pair.read1(), b"def")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001965
1966 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00001967 for method in ("readinto", "readinto1"):
1968 with self.subTest(method):
1969 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001970
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03001971 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00001972 self.assertEqual(getattr(pair, method)(data), 5)
1973 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001974
1975 def test_write(self):
1976 w = self.MockRawIO()
1977 pair = self.tp(self.MockRawIO(), w)
1978
1979 pair.write(b"abc")
1980 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00001981 buffer = bytearray(b"def")
1982 pair.write(buffer)
1983 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001984 pair.flush()
1985 self.assertEqual(w._write_stack, [b"abc", b"def"])
1986
1987 def test_peek(self):
1988 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1989
1990 self.assertTrue(pair.peek(3).startswith(b"abc"))
1991 self.assertEqual(pair.read(3), b"abc")
1992
1993 def test_readable(self):
1994 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1995 self.assertTrue(pair.readable())
1996
1997 def test_writeable(self):
1998 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1999 self.assertTrue(pair.writable())
2000
2001 def test_seekable(self):
2002 # BufferedRWPairs are never seekable, even if their readers and writers
2003 # are.
2004 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2005 self.assertFalse(pair.seekable())
2006
2007 # .flush() is delegated to the underlying writer object and has been
2008 # tested in the test_write method.
2009
2010 def test_close_and_closed(self):
2011 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2012 self.assertFalse(pair.closed)
2013 pair.close()
2014 self.assertTrue(pair.closed)
2015
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002016 def test_reader_close_error_on_close(self):
2017 def reader_close():
2018 reader_non_existing
2019 reader = self.MockRawIO()
2020 reader.close = reader_close
2021 writer = self.MockRawIO()
2022 pair = self.tp(reader, writer)
2023 with self.assertRaises(NameError) as err:
2024 pair.close()
2025 self.assertIn('reader_non_existing', str(err.exception))
2026 self.assertTrue(pair.closed)
2027 self.assertFalse(reader.closed)
2028 self.assertTrue(writer.closed)
2029
2030 def test_writer_close_error_on_close(self):
2031 def writer_close():
2032 writer_non_existing
2033 reader = self.MockRawIO()
2034 writer = self.MockRawIO()
2035 writer.close = writer_close
2036 pair = self.tp(reader, writer)
2037 with self.assertRaises(NameError) as err:
2038 pair.close()
2039 self.assertIn('writer_non_existing', str(err.exception))
2040 self.assertFalse(pair.closed)
2041 self.assertTrue(reader.closed)
2042 self.assertFalse(writer.closed)
2043
2044 def test_reader_writer_close_error_on_close(self):
2045 def reader_close():
2046 reader_non_existing
2047 def writer_close():
2048 writer_non_existing
2049 reader = self.MockRawIO()
2050 reader.close = reader_close
2051 writer = self.MockRawIO()
2052 writer.close = writer_close
2053 pair = self.tp(reader, writer)
2054 with self.assertRaises(NameError) as err:
2055 pair.close()
2056 self.assertIn('reader_non_existing', str(err.exception))
2057 self.assertIsInstance(err.exception.__context__, NameError)
2058 self.assertIn('writer_non_existing', str(err.exception.__context__))
2059 self.assertFalse(pair.closed)
2060 self.assertFalse(reader.closed)
2061 self.assertFalse(writer.closed)
2062
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002063 def test_isatty(self):
2064 class SelectableIsAtty(MockRawIO):
2065 def __init__(self, isatty):
2066 MockRawIO.__init__(self)
2067 self._isatty = isatty
2068
2069 def isatty(self):
2070 return self._isatty
2071
2072 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2073 self.assertFalse(pair.isatty())
2074
2075 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2076 self.assertTrue(pair.isatty())
2077
2078 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2079 self.assertTrue(pair.isatty())
2080
2081 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2082 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00002083
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04002084 def test_weakref_clearing(self):
2085 brw = self.tp(self.MockRawIO(), self.MockRawIO())
2086 ref = weakref.ref(brw)
2087 brw = None
2088 ref = None # Shouldn't segfault.
2089
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002090class CBufferedRWPairTest(BufferedRWPairTest):
2091 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002092
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002093class PyBufferedRWPairTest(BufferedRWPairTest):
2094 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002095
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002096
2097class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2098 read_mode = "rb+"
2099 write_mode = "wb+"
2100
2101 def test_constructor(self):
2102 BufferedReaderTest.test_constructor(self)
2103 BufferedWriterTest.test_constructor(self)
2104
Serhiy Storchaka61e24932014-02-12 10:52:35 +02002105 def test_uninitialized(self):
2106 BufferedReaderTest.test_uninitialized(self)
2107 BufferedWriterTest.test_uninitialized(self)
2108
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002109 def test_read_and_write(self):
2110 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00002111 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00002112
2113 self.assertEqual(b"as", rw.read(2))
2114 rw.write(b"ddd")
2115 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002116 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002117 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002118 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00002119
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002120 def test_seek_and_tell(self):
2121 raw = self.BytesIO(b"asdfghjkl")
2122 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00002123
Ezio Melottib3aedd42010-11-20 19:04:17 +00002124 self.assertEqual(b"as", rw.read(2))
2125 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002126 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002127 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00002128
Antoine Pitroue05565e2011-08-20 14:39:23 +02002129 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00002130 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02002131 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002132 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002133 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002134 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002135 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002136 self.assertEqual(7, rw.tell())
2137 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02002138 rw.flush()
2139 self.assertEqual(b"asdf123fl", raw.getvalue())
2140
Christian Heimes8e42a0a2007-11-08 18:04:45 +00002141 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002142
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002143 def check_flush_and_read(self, read_func):
2144 raw = self.BytesIO(b"abcdefghi")
2145 bufio = self.tp(raw)
2146
Ezio Melottib3aedd42010-11-20 19:04:17 +00002147 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002148 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002149 self.assertEqual(b"ef", read_func(bufio, 2))
2150 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002151 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002152 self.assertEqual(6, bufio.tell())
2153 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002154 raw.seek(0, 0)
2155 raw.write(b"XYZ")
2156 # flush() resets the read buffer
2157 bufio.flush()
2158 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002159 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002160
2161 def test_flush_and_read(self):
2162 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2163
2164 def test_flush_and_readinto(self):
2165 def _readinto(bufio, n=-1):
2166 b = bytearray(n if n >= 0 else 9999)
2167 n = bufio.readinto(b)
2168 return bytes(b[:n])
2169 self.check_flush_and_read(_readinto)
2170
2171 def test_flush_and_peek(self):
2172 def _peek(bufio, n=-1):
2173 # This relies on the fact that the buffer can contain the whole
2174 # raw stream, otherwise peek() can return less.
2175 b = bufio.peek(n)
2176 if n != -1:
2177 b = b[:n]
2178 bufio.seek(len(b), 1)
2179 return b
2180 self.check_flush_and_read(_peek)
2181
2182 def test_flush_and_write(self):
2183 raw = self.BytesIO(b"abcdefghi")
2184 bufio = self.tp(raw)
2185
2186 bufio.write(b"123")
2187 bufio.flush()
2188 bufio.write(b"45")
2189 bufio.flush()
2190 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002191 self.assertEqual(b"12345fghi", raw.getvalue())
2192 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002193
2194 def test_threads(self):
2195 BufferedReaderTest.test_threads(self)
2196 BufferedWriterTest.test_threads(self)
2197
2198 def test_writes_and_peek(self):
2199 def _peek(bufio):
2200 bufio.peek(1)
2201 self.check_writes(_peek)
2202 def _peek(bufio):
2203 pos = bufio.tell()
2204 bufio.seek(-1, 1)
2205 bufio.peek(1)
2206 bufio.seek(pos, 0)
2207 self.check_writes(_peek)
2208
2209 def test_writes_and_reads(self):
2210 def _read(bufio):
2211 bufio.seek(-1, 1)
2212 bufio.read(1)
2213 self.check_writes(_read)
2214
2215 def test_writes_and_read1s(self):
2216 def _read1(bufio):
2217 bufio.seek(-1, 1)
2218 bufio.read1(1)
2219 self.check_writes(_read1)
2220
2221 def test_writes_and_readintos(self):
2222 def _read(bufio):
2223 bufio.seek(-1, 1)
2224 bufio.readinto(bytearray(1))
2225 self.check_writes(_read)
2226
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002227 def test_write_after_readahead(self):
2228 # Issue #6629: writing after the buffer was filled by readahead should
2229 # first rewind the raw stream.
2230 for overwrite_size in [1, 5]:
2231 raw = self.BytesIO(b"A" * 10)
2232 bufio = self.tp(raw, 4)
2233 # Trigger readahead
2234 self.assertEqual(bufio.read(1), b"A")
2235 self.assertEqual(bufio.tell(), 1)
2236 # Overwriting should rewind the raw stream if it needs so
2237 bufio.write(b"B" * overwrite_size)
2238 self.assertEqual(bufio.tell(), overwrite_size + 1)
2239 # If the write size was smaller than the buffer size, flush() and
2240 # check that rewind happens.
2241 bufio.flush()
2242 self.assertEqual(bufio.tell(), overwrite_size + 1)
2243 s = raw.getvalue()
2244 self.assertEqual(s,
2245 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2246
Antoine Pitrou7c404892011-05-13 00:13:33 +02002247 def test_write_rewind_write(self):
2248 # Various combinations of reading / writing / seeking backwards / writing again
2249 def mutate(bufio, pos1, pos2):
2250 assert pos2 >= pos1
2251 # Fill the buffer
2252 bufio.seek(pos1)
2253 bufio.read(pos2 - pos1)
2254 bufio.write(b'\x02')
2255 # This writes earlier than the previous write, but still inside
2256 # the buffer.
2257 bufio.seek(pos1)
2258 bufio.write(b'\x01')
2259
2260 b = b"\x80\x81\x82\x83\x84"
2261 for i in range(0, len(b)):
2262 for j in range(i, len(b)):
2263 raw = self.BytesIO(b)
2264 bufio = self.tp(raw, 100)
2265 mutate(bufio, i, j)
2266 bufio.flush()
2267 expected = bytearray(b)
2268 expected[j] = 2
2269 expected[i] = 1
2270 self.assertEqual(raw.getvalue(), expected,
2271 "failed result for i=%d, j=%d" % (i, j))
2272
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002273 def test_truncate_after_read_or_write(self):
2274 raw = self.BytesIO(b"A" * 10)
2275 bufio = self.tp(raw, 100)
2276 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2277 self.assertEqual(bufio.truncate(), 2)
2278 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2279 self.assertEqual(bufio.truncate(), 4)
2280
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002281 def test_misbehaved_io(self):
2282 BufferedReaderTest.test_misbehaved_io(self)
2283 BufferedWriterTest.test_misbehaved_io(self)
2284
Antoine Pitroue05565e2011-08-20 14:39:23 +02002285 def test_interleaved_read_write(self):
2286 # Test for issue #12213
2287 with self.BytesIO(b'abcdefgh') as raw:
2288 with self.tp(raw, 100) as f:
2289 f.write(b"1")
2290 self.assertEqual(f.read(1), b'b')
2291 f.write(b'2')
2292 self.assertEqual(f.read1(1), b'd')
2293 f.write(b'3')
2294 buf = bytearray(1)
2295 f.readinto(buf)
2296 self.assertEqual(buf, b'f')
2297 f.write(b'4')
2298 self.assertEqual(f.peek(1), b'h')
2299 f.flush()
2300 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2301
2302 with self.BytesIO(b'abc') as raw:
2303 with self.tp(raw, 100) as f:
2304 self.assertEqual(f.read(1), b'a')
2305 f.write(b"2")
2306 self.assertEqual(f.read(1), b'c')
2307 f.flush()
2308 self.assertEqual(raw.getvalue(), b'a2c')
2309
2310 def test_interleaved_readline_write(self):
2311 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2312 with self.tp(raw) as f:
2313 f.write(b'1')
2314 self.assertEqual(f.readline(), b'b\n')
2315 f.write(b'2')
2316 self.assertEqual(f.readline(), b'def\n')
2317 f.write(b'3')
2318 self.assertEqual(f.readline(), b'\n')
2319 f.flush()
2320 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2321
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002322 # You can't construct a BufferedRandom over a non-seekable stream.
2323 test_unseekable = None
2324
R David Murray67bfe802013-02-23 21:51:05 -05002325
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002326class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002327 tp = io.BufferedRandom
2328
Gregory P. Smithe5796c42018-12-30 20:17:57 -08002329 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
2330 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002331 def test_constructor(self):
2332 BufferedRandomTest.test_constructor(self)
2333 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08002334 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002335 if sys.maxsize > 0x7FFFFFFF:
2336 rawio = self.MockRawIO()
2337 bufio = self.tp(rawio)
2338 self.assertRaises((OverflowError, MemoryError, ValueError),
2339 bufio.__init__, rawio, sys.maxsize)
2340
2341 def test_garbage_collection(self):
2342 CBufferedReaderTest.test_garbage_collection(self)
2343 CBufferedWriterTest.test_garbage_collection(self)
2344
R David Murray67bfe802013-02-23 21:51:05 -05002345 def test_args_error(self):
2346 # Issue #17275
2347 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2348 self.tp(io.BytesIO(), 1024, 1024, 1024)
2349
2350
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002351class PyBufferedRandomTest(BufferedRandomTest):
2352 tp = pyio.BufferedRandom
2353
2354
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002355# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2356# properties:
2357# - A single output character can correspond to many bytes of input.
2358# - The number of input bytes to complete the character can be
2359# undetermined until the last input byte is received.
2360# - The number of input bytes can vary depending on previous input.
2361# - A single input byte can correspond to many characters of output.
2362# - The number of output characters can be undetermined until the
2363# last input byte is received.
2364# - The number of output characters can vary depending on previous input.
2365
2366class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2367 """
2368 For testing seek/tell behavior with a stateful, buffering decoder.
2369
2370 Input is a sequence of words. Words may be fixed-length (length set
2371 by input) or variable-length (period-terminated). In variable-length
2372 mode, extra periods are ignored. Possible words are:
2373 - 'i' followed by a number sets the input length, I (maximum 99).
2374 When I is set to 0, words are space-terminated.
2375 - 'o' followed by a number sets the output length, O (maximum 99).
2376 - Any other word is converted into a word followed by a period on
2377 the output. The output word consists of the input word truncated
2378 or padded out with hyphens to make its length equal to O. If O
2379 is 0, the word is output verbatim without truncating or padding.
2380 I and O are initially set to 1. When I changes, any buffered input is
2381 re-scanned according to the new I. EOF also terminates the last word.
2382 """
2383
2384 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002385 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002386 self.reset()
2387
2388 def __repr__(self):
2389 return '<SID %x>' % id(self)
2390
2391 def reset(self):
2392 self.i = 1
2393 self.o = 1
2394 self.buffer = bytearray()
2395
2396 def getstate(self):
2397 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2398 return bytes(self.buffer), i*100 + o
2399
2400 def setstate(self, state):
2401 buffer, io = state
2402 self.buffer = bytearray(buffer)
2403 i, o = divmod(io, 100)
2404 self.i, self.o = i ^ 1, o ^ 1
2405
2406 def decode(self, input, final=False):
2407 output = ''
2408 for b in input:
2409 if self.i == 0: # variable-length, terminated with period
2410 if b == ord('.'):
2411 if self.buffer:
2412 output += self.process_word()
2413 else:
2414 self.buffer.append(b)
2415 else: # fixed-length, terminate after self.i bytes
2416 self.buffer.append(b)
2417 if len(self.buffer) == self.i:
2418 output += self.process_word()
2419 if final and self.buffer: # EOF terminates the last word
2420 output += self.process_word()
2421 return output
2422
2423 def process_word(self):
2424 output = ''
2425 if self.buffer[0] == ord('i'):
2426 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2427 elif self.buffer[0] == ord('o'):
2428 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2429 else:
2430 output = self.buffer.decode('ascii')
2431 if len(output) < self.o:
2432 output += '-'*self.o # pad out with hyphens
2433 if self.o:
2434 output = output[:self.o] # truncate to output length
2435 output += '.'
2436 self.buffer = bytearray()
2437 return output
2438
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002439 codecEnabled = False
2440
2441 @classmethod
2442 def lookupTestDecoder(cls, name):
2443 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002444 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002445 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002446 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002447 incrementalencoder=None,
2448 streamreader=None, streamwriter=None,
2449 incrementaldecoder=cls)
2450
2451# Register the previous decoder for testing.
2452# Disabled by default, tests will enable it.
2453codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2454
2455
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002456class StatefulIncrementalDecoderTest(unittest.TestCase):
2457 """
2458 Make sure the StatefulIncrementalDecoder actually works.
2459 """
2460
2461 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002462 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002463 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002464 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002465 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002466 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002467 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002468 # I=0, O=6 (variable-length input, fixed-length output)
2469 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2470 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002471 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002472 # I=6, O=3 (fixed-length input > fixed-length output)
2473 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2474 # I=0, then 3; O=29, then 15 (with longer output)
2475 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2476 'a----------------------------.' +
2477 'b----------------------------.' +
2478 'cde--------------------------.' +
2479 'abcdefghijabcde.' +
2480 'a.b------------.' +
2481 '.c.------------.' +
2482 'd.e------------.' +
2483 'k--------------.' +
2484 'l--------------.' +
2485 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002486 ]
2487
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002488 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002489 # Try a few one-shot test cases.
2490 for input, eof, output in self.test_cases:
2491 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002492 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002493
2494 # Also test an unfinished decode, followed by forcing EOF.
2495 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002496 self.assertEqual(d.decode(b'oiabcd'), '')
2497 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002498
2499class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002500
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002501 def setUp(self):
2502 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2503 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002504 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002505
Guido van Rossumd0712812007-04-11 16:32:43 +00002506 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002507 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002508
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002509 def test_constructor(self):
2510 r = self.BytesIO(b"\xc3\xa9\n\n")
2511 b = self.BufferedReader(r, 1000)
2512 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002513 t.__init__(b, encoding="latin-1", newline="\r\n")
2514 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002515 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002516 t.__init__(b, encoding="utf-8", line_buffering=True)
2517 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002518 self.assertEqual(t.line_buffering, True)
2519 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002520 self.assertRaises(TypeError, t.__init__, b, newline=42)
2521 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2522
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002523 def test_uninitialized(self):
2524 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2525 del t
2526 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2527 self.assertRaises(Exception, repr, t)
2528 self.assertRaisesRegex((ValueError, AttributeError),
2529 'uninitialized|has no attribute',
2530 t.read, 0)
2531 t.__init__(self.MockRawIO())
2532 self.assertEqual(t.read(0), '')
2533
Nick Coghlana9b15242014-02-04 22:11:18 +10002534 def test_non_text_encoding_codecs_are_rejected(self):
2535 # Ensure the constructor complains if passed a codec that isn't
2536 # marked as a text encoding
2537 # http://bugs.python.org/issue20404
2538 r = self.BytesIO()
2539 b = self.BufferedWriter(r)
2540 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2541 self.TextIOWrapper(b, encoding="hex")
2542
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002543 def test_detach(self):
2544 r = self.BytesIO()
2545 b = self.BufferedWriter(r)
2546 t = self.TextIOWrapper(b)
2547 self.assertIs(t.detach(), b)
2548
2549 t = self.TextIOWrapper(b, encoding="ascii")
2550 t.write("howdy")
2551 self.assertFalse(r.getvalue())
2552 t.detach()
2553 self.assertEqual(r.getvalue(), b"howdy")
2554 self.assertRaises(ValueError, t.detach)
2555
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002556 # Operations independent of the detached stream should still work
2557 repr(t)
2558 self.assertEqual(t.encoding, "ascii")
2559 self.assertEqual(t.errors, "strict")
2560 self.assertFalse(t.line_buffering)
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002561 self.assertFalse(t.write_through)
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002562
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002563 def test_repr(self):
2564 raw = self.BytesIO("hello".encode("utf-8"))
2565 b = self.BufferedReader(raw)
2566 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002567 modname = self.TextIOWrapper.__module__
2568 self.assertEqual(repr(t),
2569 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2570 raw.name = "dummy"
2571 self.assertEqual(repr(t),
2572 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002573 t.mode = "r"
2574 self.assertEqual(repr(t),
2575 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002576 raw.name = b"dummy"
2577 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002578 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002579
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002580 t.buffer.detach()
2581 repr(t) # Should not raise an exception
2582
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002583 def test_recursive_repr(self):
2584 # Issue #25455
2585 raw = self.BytesIO()
2586 t = self.TextIOWrapper(raw)
2587 with support.swap_attr(raw, 'name', t):
2588 try:
2589 repr(t) # Should not crash
2590 except RuntimeError:
2591 pass
2592
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002593 def test_line_buffering(self):
2594 r = self.BytesIO()
2595 b = self.BufferedWriter(r, 1000)
2596 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002597 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002598 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002599 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002600 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002601 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002602 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002603
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002604 def test_reconfigure_line_buffering(self):
2605 r = self.BytesIO()
2606 b = self.BufferedWriter(r, 1000)
2607 t = self.TextIOWrapper(b, newline="\n", line_buffering=False)
2608 t.write("AB\nC")
2609 self.assertEqual(r.getvalue(), b"")
2610
2611 t.reconfigure(line_buffering=True) # implicit flush
2612 self.assertEqual(r.getvalue(), b"AB\nC")
2613 t.write("DEF\nG")
2614 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2615 t.write("H")
2616 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2617 t.reconfigure(line_buffering=False) # implicit flush
2618 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2619 t.write("IJ")
2620 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2621
2622 # Keeping default value
2623 t.reconfigure()
2624 t.reconfigure(line_buffering=None)
2625 self.assertEqual(t.line_buffering, False)
2626 t.reconfigure(line_buffering=True)
2627 t.reconfigure()
2628 t.reconfigure(line_buffering=None)
2629 self.assertEqual(t.line_buffering, True)
2630
Victor Stinner91106cd2017-12-13 12:29:09 +01002631 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002632 def test_default_encoding(self):
2633 old_environ = dict(os.environ)
2634 try:
2635 # try to get a user preferred encoding different than the current
2636 # locale encoding to check that TextIOWrapper() uses the current
2637 # locale encoding and not the user preferred encoding
2638 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2639 if key in os.environ:
2640 del os.environ[key]
2641
2642 current_locale_encoding = locale.getpreferredencoding(False)
2643 b = self.BytesIO()
2644 t = self.TextIOWrapper(b)
2645 self.assertEqual(t.encoding, current_locale_encoding)
2646 finally:
2647 os.environ.clear()
2648 os.environ.update(old_environ)
2649
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002650 @support.cpython_only
Victor Stinner91106cd2017-12-13 12:29:09 +01002651 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Serhiy Storchaka78980432013-01-15 01:12:17 +02002652 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002653 # Issue 15989
2654 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002655 b = self.BytesIO()
2656 b.fileno = lambda: _testcapi.INT_MAX + 1
2657 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2658 b.fileno = lambda: _testcapi.UINT_MAX + 1
2659 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2660
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002661 def test_encoding(self):
2662 # Check the encoding attribute is always set, and valid
2663 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002664 t = self.TextIOWrapper(b, encoding="utf-8")
2665 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002666 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002667 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002668 codecs.lookup(t.encoding)
2669
2670 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002671 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002672 b = self.BytesIO(b"abc\n\xff\n")
2673 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002674 self.assertRaises(UnicodeError, t.read)
2675 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002676 b = self.BytesIO(b"abc\n\xff\n")
2677 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002678 self.assertRaises(UnicodeError, t.read)
2679 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002680 b = self.BytesIO(b"abc\n\xff\n")
2681 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002682 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002683 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002684 b = self.BytesIO(b"abc\n\xff\n")
2685 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002686 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002687
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002688 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002689 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002690 b = self.BytesIO()
2691 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002692 self.assertRaises(UnicodeError, t.write, "\xff")
2693 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002694 b = self.BytesIO()
2695 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002696 self.assertRaises(UnicodeError, t.write, "\xff")
2697 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002698 b = self.BytesIO()
2699 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002700 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002701 t.write("abc\xffdef\n")
2702 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002703 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002704 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002705 b = self.BytesIO()
2706 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002707 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002708 t.write("abc\xffdef\n")
2709 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002710 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002711
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002712 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002713 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2714
2715 tests = [
2716 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002717 [ '', input_lines ],
2718 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2719 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2720 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002721 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002722 encodings = (
2723 'utf-8', 'latin-1',
2724 'utf-16', 'utf-16-le', 'utf-16-be',
2725 'utf-32', 'utf-32-le', 'utf-32-be',
2726 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002727
Guido van Rossum8358db22007-08-18 21:39:55 +00002728 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002729 # character in TextIOWrapper._pending_line.
2730 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002731 # XXX: str.encode() should return bytes
2732 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002733 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002734 for bufsize in range(1, 10):
2735 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002736 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2737 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002738 encoding=encoding)
2739 if do_reads:
2740 got_lines = []
2741 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002742 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002743 if c2 == '':
2744 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002745 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002746 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002747 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002748 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002749
2750 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002751 self.assertEqual(got_line, exp_line)
2752 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002753
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002754 def test_newlines_input(self):
2755 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002756 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2757 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002758 (None, normalized.decode("ascii").splitlines(keepends=True)),
2759 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002760 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2761 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2762 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002763 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002764 buf = self.BytesIO(testdata)
2765 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002766 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002767 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002768 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002769
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002770 def test_newlines_output(self):
2771 testdict = {
2772 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2773 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2774 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2775 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2776 }
2777 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2778 for newline, expected in tests:
2779 buf = self.BytesIO()
2780 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2781 txt.write("AAA\nB")
2782 txt.write("BB\nCCC\n")
2783 txt.write("X\rY\r\nZ")
2784 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002785 self.assertEqual(buf.closed, False)
2786 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002787
2788 def test_destructor(self):
2789 l = []
2790 base = self.BytesIO
2791 class MyBytesIO(base):
2792 def close(self):
2793 l.append(self.getvalue())
2794 base.close(self)
2795 b = MyBytesIO()
2796 t = self.TextIOWrapper(b, encoding="ascii")
2797 t.write("abc")
2798 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002799 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002800 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002801
2802 def test_override_destructor(self):
2803 record = []
2804 class MyTextIO(self.TextIOWrapper):
2805 def __del__(self):
2806 record.append(1)
2807 try:
2808 f = super().__del__
2809 except AttributeError:
2810 pass
2811 else:
2812 f()
2813 def close(self):
2814 record.append(2)
2815 super().close()
2816 def flush(self):
2817 record.append(3)
2818 super().flush()
2819 b = self.BytesIO()
2820 t = MyTextIO(b, encoding="ascii")
2821 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002822 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002823 self.assertEqual(record, [1, 2, 3])
2824
2825 def test_error_through_destructor(self):
2826 # Test that the exception state is not modified by a destructor,
2827 # even if close() fails.
2828 rawio = self.CloseFailureIO()
2829 def f():
2830 self.TextIOWrapper(rawio).xyzzy
2831 with support.captured_output("stderr") as s:
2832 self.assertRaises(AttributeError, f)
2833 s = s.getvalue().strip()
2834 if s:
2835 # The destructor *may* have printed an unraisable error, check it
2836 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002837 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002838 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002839
Guido van Rossum9b76da62007-04-11 01:09:03 +00002840 # Systematic tests of the text I/O API
2841
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002842 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002843 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002844 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002845 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002846 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002847 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002848 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002849 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002850 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002851 self.assertEqual(f.tell(), 0)
2852 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002853 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002854 self.assertEqual(f.seek(0), 0)
2855 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002856 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002857 self.assertEqual(f.read(2), "ab")
2858 self.assertEqual(f.read(1), "c")
2859 self.assertEqual(f.read(1), "")
2860 self.assertEqual(f.read(), "")
2861 self.assertEqual(f.tell(), cookie)
2862 self.assertEqual(f.seek(0), 0)
2863 self.assertEqual(f.seek(0, 2), cookie)
2864 self.assertEqual(f.write("def"), 3)
2865 self.assertEqual(f.seek(cookie), cookie)
2866 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002867 if enc.startswith("utf"):
2868 self.multi_line_test(f, enc)
2869 f.close()
2870
2871 def multi_line_test(self, f, enc):
2872 f.seek(0)
2873 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002874 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002875 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002876 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002877 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002878 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002879 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002880 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002881 wlines.append((f.tell(), line))
2882 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002883 f.seek(0)
2884 rlines = []
2885 while True:
2886 pos = f.tell()
2887 line = f.readline()
2888 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002889 break
2890 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002891 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002892
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002893 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002894 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002895 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002896 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002897 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002898 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002899 p2 = f.tell()
2900 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002901 self.assertEqual(f.tell(), p0)
2902 self.assertEqual(f.readline(), "\xff\n")
2903 self.assertEqual(f.tell(), p1)
2904 self.assertEqual(f.readline(), "\xff\n")
2905 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002906 f.seek(0)
2907 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002908 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002909 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002910 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002911 f.close()
2912
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002913 def test_seeking(self):
2914 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002915 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002916 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002917 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002918 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002919 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002920 suffix = bytes(u_suffix.encode("utf-8"))
2921 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002922 with self.open(support.TESTFN, "wb") as f:
2923 f.write(line*2)
2924 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2925 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002926 self.assertEqual(s, str(prefix, "ascii"))
2927 self.assertEqual(f.tell(), prefix_size)
2928 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002929
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002930 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002931 # Regression test for a specific bug
2932 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002933 with self.open(support.TESTFN, "wb") as f:
2934 f.write(data)
2935 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2936 f._CHUNK_SIZE # Just test that it exists
2937 f._CHUNK_SIZE = 2
2938 f.readline()
2939 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002940
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002941 def test_seek_and_tell(self):
2942 #Test seek/tell using the StatefulIncrementalDecoder.
2943 # Make test faster by doing smaller seeks
2944 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002945
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002946 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002947 """Tell/seek to various points within a data stream and ensure
2948 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002949 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002950 f.write(data)
2951 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002952 f = self.open(support.TESTFN, encoding='test_decoder')
2953 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002954 decoded = f.read()
2955 f.close()
2956
Neal Norwitze2b07052008-03-18 19:52:05 +00002957 for i in range(min_pos, len(decoded) + 1): # seek positions
2958 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002959 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002960 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002961 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002962 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002963 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002964 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002965 f.close()
2966
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002967 # Enable the test decoder.
2968 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002969
2970 # Run the tests.
2971 try:
2972 # Try each test case.
2973 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002974 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002975
2976 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002977 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2978 offset = CHUNK_SIZE - len(input)//2
2979 prefix = b'.'*offset
2980 # Don't bother seeking into the prefix (takes too long).
2981 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002982 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002983
2984 # Ensure our test decoder won't interfere with subsequent tests.
2985 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002986 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002987
Christopher Thorneac22f6a2018-11-01 10:48:49 +00002988 def test_multibyte_seek_and_tell(self):
2989 f = self.open(support.TESTFN, "w", encoding="euc_jp")
2990 f.write("AB\n\u3046\u3048\n")
2991 f.close()
2992
2993 f = self.open(support.TESTFN, "r", encoding="euc_jp")
2994 self.assertEqual(f.readline(), "AB\n")
2995 p0 = f.tell()
2996 self.assertEqual(f.readline(), "\u3046\u3048\n")
2997 p1 = f.tell()
2998 f.seek(p0)
2999 self.assertEqual(f.readline(), "\u3046\u3048\n")
3000 self.assertEqual(f.tell(), p1)
3001 f.close()
3002
3003 def test_seek_with_encoder_state(self):
3004 f = self.open(support.TESTFN, "w", encoding="euc_jis_2004")
3005 f.write("\u00e6\u0300")
3006 p0 = f.tell()
3007 f.write("\u00e6")
3008 f.seek(p0)
3009 f.write("\u0300")
3010 f.close()
3011
3012 f = self.open(support.TESTFN, "r", encoding="euc_jis_2004")
3013 self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
3014 f.close()
3015
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003016 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003017 data = "1234567890"
3018 tests = ("utf-16",
3019 "utf-16-le",
3020 "utf-16-be",
3021 "utf-32",
3022 "utf-32-le",
3023 "utf-32-be")
3024 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003025 buf = self.BytesIO()
3026 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003027 # Check if the BOM is written only once (see issue1753).
3028 f.write(data)
3029 f.write(data)
3030 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003031 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00003032 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003033 self.assertEqual(f.read(), data * 2)
3034 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003035
Benjamin Petersona1b49012009-03-31 23:11:32 +00003036 def test_unreadable(self):
3037 class UnReadable(self.BytesIO):
3038 def readable(self):
3039 return False
3040 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003041 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00003042
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003043 def test_read_one_by_one(self):
3044 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003045 reads = ""
3046 while True:
3047 c = txt.read(1)
3048 if not c:
3049 break
3050 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003051 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003052
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00003053 def test_readlines(self):
3054 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
3055 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
3056 txt.seek(0)
3057 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
3058 txt.seek(0)
3059 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
3060
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003061 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003062 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003063 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003064 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003065 reads = ""
3066 while True:
3067 c = txt.read(128)
3068 if not c:
3069 break
3070 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003071 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003072
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02003073 def test_writelines(self):
3074 l = ['ab', 'cd', 'ef']
3075 buf = self.BytesIO()
3076 txt = self.TextIOWrapper(buf)
3077 txt.writelines(l)
3078 txt.flush()
3079 self.assertEqual(buf.getvalue(), b'abcdef')
3080
3081 def test_writelines_userlist(self):
3082 l = UserList(['ab', 'cd', 'ef'])
3083 buf = self.BytesIO()
3084 txt = self.TextIOWrapper(buf)
3085 txt.writelines(l)
3086 txt.flush()
3087 self.assertEqual(buf.getvalue(), b'abcdef')
3088
3089 def test_writelines_error(self):
3090 txt = self.TextIOWrapper(self.BytesIO())
3091 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3092 self.assertRaises(TypeError, txt.writelines, None)
3093 self.assertRaises(TypeError, txt.writelines, b'abc')
3094
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003095 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003096 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003097
3098 # read one char at a time
3099 reads = ""
3100 while True:
3101 c = txt.read(1)
3102 if not c:
3103 break
3104 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003105 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003106
3107 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003108 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003109 txt._CHUNK_SIZE = 4
3110
3111 reads = ""
3112 while True:
3113 c = txt.read(4)
3114 if not c:
3115 break
3116 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003117 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003118
3119 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003120 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003121 txt._CHUNK_SIZE = 4
3122
3123 reads = txt.read(4)
3124 reads += txt.read(4)
3125 reads += txt.readline()
3126 reads += txt.readline()
3127 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003128 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003129
3130 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003131 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003132 txt._CHUNK_SIZE = 4
3133
3134 reads = txt.read(4)
3135 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003136 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003137
3138 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003139 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003140 txt._CHUNK_SIZE = 4
3141
3142 reads = txt.read(4)
3143 pos = txt.tell()
3144 txt.seek(0)
3145 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003146 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003147
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003148 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003149 buffer = self.BytesIO(self.testdata)
3150 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003151
3152 self.assertEqual(buffer.seekable(), txt.seekable())
3153
Antoine Pitroue4501852009-05-14 18:55:55 +00003154 def test_append_bom(self):
3155 # The BOM is not written again when appending to a non-empty file
3156 filename = support.TESTFN
3157 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3158 with self.open(filename, 'w', encoding=charset) as f:
3159 f.write('aaa')
3160 pos = f.tell()
3161 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003162 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003163
3164 with self.open(filename, 'a', encoding=charset) as f:
3165 f.write('xxx')
3166 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003167 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003168
3169 def test_seek_bom(self):
3170 # Same test, but when seeking manually
3171 filename = support.TESTFN
3172 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3173 with self.open(filename, 'w', encoding=charset) as f:
3174 f.write('aaa')
3175 pos = f.tell()
3176 with self.open(filename, 'r+', encoding=charset) as f:
3177 f.seek(pos)
3178 f.write('zzz')
3179 f.seek(0)
3180 f.write('bbb')
3181 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003182 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003183
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003184 def test_seek_append_bom(self):
3185 # Same test, but first seek to the start and then to the end
3186 filename = support.TESTFN
3187 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3188 with self.open(filename, 'w', encoding=charset) as f:
3189 f.write('aaa')
3190 with self.open(filename, 'a', encoding=charset) as f:
3191 f.seek(0)
3192 f.seek(0, self.SEEK_END)
3193 f.write('xxx')
3194 with self.open(filename, 'rb') as f:
3195 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3196
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003197 def test_errors_property(self):
3198 with self.open(support.TESTFN, "w") as f:
3199 self.assertEqual(f.errors, "strict")
3200 with self.open(support.TESTFN, "w", errors="replace") as f:
3201 self.assertEqual(f.errors, "replace")
3202
Brett Cannon31f59292011-02-21 19:29:56 +00003203 @support.no_tracing
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003204 def test_threads_write(self):
3205 # Issue6750: concurrent writes could duplicate data
3206 event = threading.Event()
3207 with self.open(support.TESTFN, "w", buffering=1) as f:
3208 def run(n):
3209 text = "Thread%03d\n" % n
3210 event.wait()
3211 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003212 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003213 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003214 with support.start_threads(threads, event.set):
3215 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003216 with self.open(support.TESTFN) as f:
3217 content = f.read()
3218 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003219 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003220
Antoine Pitrou6be88762010-05-03 16:48:20 +00003221 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003222 # Test that text file is closed despite failed flush
3223 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003224 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003225 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003226 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003227 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003228 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003229 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003230 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003231 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003232 self.assertTrue(txt.buffer.closed)
3233 self.assertTrue(closed) # flush() called
3234 self.assertFalse(closed[0]) # flush() called before file closed
3235 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003236 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003237
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003238 def test_close_error_on_close(self):
3239 buffer = self.BytesIO(self.testdata)
3240 def bad_flush():
3241 raise OSError('flush')
3242 def bad_close():
3243 raise OSError('close')
3244 buffer.close = bad_close
3245 txt = self.TextIOWrapper(buffer, encoding="ascii")
3246 txt.flush = bad_flush
3247 with self.assertRaises(OSError) as err: # exception not swallowed
3248 txt.close()
3249 self.assertEqual(err.exception.args, ('close',))
3250 self.assertIsInstance(err.exception.__context__, OSError)
3251 self.assertEqual(err.exception.__context__.args, ('flush',))
3252 self.assertFalse(txt.closed)
3253
3254 def test_nonnormalized_close_error_on_close(self):
3255 # Issue #21677
3256 buffer = self.BytesIO(self.testdata)
3257 def bad_flush():
3258 raise non_existing_flush
3259 def bad_close():
3260 raise non_existing_close
3261 buffer.close = bad_close
3262 txt = self.TextIOWrapper(buffer, encoding="ascii")
3263 txt.flush = bad_flush
3264 with self.assertRaises(NameError) as err: # exception not swallowed
3265 txt.close()
3266 self.assertIn('non_existing_close', str(err.exception))
3267 self.assertIsInstance(err.exception.__context__, NameError)
3268 self.assertIn('non_existing_flush', str(err.exception.__context__))
3269 self.assertFalse(txt.closed)
3270
Antoine Pitrou6be88762010-05-03 16:48:20 +00003271 def test_multi_close(self):
3272 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3273 txt.close()
3274 txt.close()
3275 txt.close()
3276 self.assertRaises(ValueError, txt.flush)
3277
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003278 def test_unseekable(self):
3279 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3280 self.assertRaises(self.UnsupportedOperation, txt.tell)
3281 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3282
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003283 def test_readonly_attributes(self):
3284 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3285 buf = self.BytesIO(self.testdata)
3286 with self.assertRaises(AttributeError):
3287 txt.buffer = buf
3288
Antoine Pitroue96ec682011-07-23 21:46:35 +02003289 def test_rawio(self):
3290 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3291 # that subprocess.Popen() can have the required unbuffered
3292 # semantics with universal_newlines=True.
3293 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3294 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3295 # Reads
3296 self.assertEqual(txt.read(4), 'abcd')
3297 self.assertEqual(txt.readline(), 'efghi\n')
3298 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3299
3300 def test_rawio_write_through(self):
3301 # Issue #12591: with write_through=True, writes don't need a flush
3302 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3303 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3304 write_through=True)
3305 txt.write('1')
3306 txt.write('23\n4')
3307 txt.write('5')
3308 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3309
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003310 def test_bufio_write_through(self):
3311 # Issue #21396: write_through=True doesn't force a flush()
3312 # on the underlying binary buffered object.
3313 flush_called, write_called = [], []
3314 class BufferedWriter(self.BufferedWriter):
3315 def flush(self, *args, **kwargs):
3316 flush_called.append(True)
3317 return super().flush(*args, **kwargs)
3318 def write(self, *args, **kwargs):
3319 write_called.append(True)
3320 return super().write(*args, **kwargs)
3321
3322 rawio = self.BytesIO()
3323 data = b"a"
3324 bufio = BufferedWriter(rawio, len(data)*2)
3325 textio = self.TextIOWrapper(bufio, encoding='ascii',
3326 write_through=True)
3327 # write to the buffered io but don't overflow the buffer
3328 text = data.decode('ascii')
3329 textio.write(text)
3330
3331 # buffer.flush is not called with write_through=True
3332 self.assertFalse(flush_called)
3333 # buffer.write *is* called with write_through=True
3334 self.assertTrue(write_called)
3335 self.assertEqual(rawio.getvalue(), b"") # no flush
3336
3337 write_called = [] # reset
3338 textio.write(text * 10) # total content is larger than bufio buffer
3339 self.assertTrue(write_called)
3340 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3341
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02003342 def test_reconfigure_write_through(self):
3343 raw = self.MockRawIO([])
3344 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3345 t.write('1')
3346 t.reconfigure(write_through=True) # implied flush
3347 self.assertEqual(t.write_through, True)
3348 self.assertEqual(b''.join(raw._write_stack), b'1')
3349 t.write('23')
3350 self.assertEqual(b''.join(raw._write_stack), b'123')
3351 t.reconfigure(write_through=False)
3352 self.assertEqual(t.write_through, False)
3353 t.write('45')
3354 t.flush()
3355 self.assertEqual(b''.join(raw._write_stack), b'12345')
3356 # Keeping default value
3357 t.reconfigure()
3358 t.reconfigure(write_through=None)
3359 self.assertEqual(t.write_through, False)
3360 t.reconfigure(write_through=True)
3361 t.reconfigure()
3362 t.reconfigure(write_through=None)
3363 self.assertEqual(t.write_through, True)
3364
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003365 def test_read_nonbytes(self):
3366 # Issue #17106
3367 # Crash when underlying read() returns non-bytes
3368 t = self.TextIOWrapper(self.StringIO('a'))
3369 self.assertRaises(TypeError, t.read, 1)
3370 t = self.TextIOWrapper(self.StringIO('a'))
3371 self.assertRaises(TypeError, t.readline)
3372 t = self.TextIOWrapper(self.StringIO('a'))
3373 self.assertRaises(TypeError, t.read)
3374
Oren Milmana5b4ea12017-08-25 21:14:54 +03003375 def test_illegal_encoder(self):
3376 # Issue 31271: Calling write() while the return value of encoder's
3377 # encode() is invalid shouldn't cause an assertion failure.
3378 rot13 = codecs.lookup("rot13")
3379 with support.swap_attr(rot13, '_is_text_encoding', True):
3380 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3381 self.assertRaises(TypeError, t.write, 'bar')
3382
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003383 def test_illegal_decoder(self):
3384 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003385 # Bypass the early encoding check added in issue 20404
3386 def _make_illegal_wrapper():
3387 quopri = codecs.lookup("quopri")
3388 quopri._is_text_encoding = True
3389 try:
3390 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3391 newline='\n', encoding="quopri")
3392 finally:
3393 quopri._is_text_encoding = False
3394 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003395 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003396 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003397 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003398 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003399 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003400 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003401 self.assertRaises(TypeError, t.read)
3402
Oren Milmanba7d7362017-08-29 11:58:27 +03003403 # Issue 31243: calling read() while the return value of decoder's
3404 # getstate() is invalid should neither crash the interpreter nor
3405 # raise a SystemError.
3406 def _make_very_illegal_wrapper(getstate_ret_val):
3407 class BadDecoder:
3408 def getstate(self):
3409 return getstate_ret_val
3410 def _get_bad_decoder(dummy):
3411 return BadDecoder()
3412 quopri = codecs.lookup("quopri")
3413 with support.swap_attr(quopri, 'incrementaldecoder',
3414 _get_bad_decoder):
3415 return _make_illegal_wrapper()
3416 t = _make_very_illegal_wrapper(42)
3417 self.assertRaises(TypeError, t.read, 42)
3418 t = _make_very_illegal_wrapper(())
3419 self.assertRaises(TypeError, t.read, 42)
3420 t = _make_very_illegal_wrapper((1, 2))
3421 self.assertRaises(TypeError, t.read, 42)
3422
Antoine Pitrou712cb732013-12-21 15:51:54 +01003423 def _check_create_at_shutdown(self, **kwargs):
3424 # Issue #20037: creating a TextIOWrapper at shutdown
3425 # shouldn't crash the interpreter.
3426 iomod = self.io.__name__
3427 code = """if 1:
3428 import codecs
3429 import {iomod} as io
3430
3431 # Avoid looking up codecs at shutdown
3432 codecs.lookup('utf-8')
3433
3434 class C:
3435 def __init__(self):
3436 self.buf = io.BytesIO()
3437 def __del__(self):
3438 io.TextIOWrapper(self.buf, **{kwargs})
3439 print("ok")
3440 c = C()
3441 """.format(iomod=iomod, kwargs=kwargs)
3442 return assert_python_ok("-c", code)
3443
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003444 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003445 def test_create_at_shutdown_without_encoding(self):
3446 rc, out, err = self._check_create_at_shutdown()
3447 if err:
3448 # Can error out with a RuntimeError if the module state
3449 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003450 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003451 else:
3452 self.assertEqual("ok", out.decode().strip())
3453
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003454 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003455 def test_create_at_shutdown_with_encoding(self):
3456 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3457 errors='strict')
3458 self.assertFalse(err)
3459 self.assertEqual("ok", out.decode().strip())
3460
Antoine Pitroub8503892014-04-29 10:14:02 +02003461 def test_read_byteslike(self):
3462 r = MemviewBytesIO(b'Just some random string\n')
3463 t = self.TextIOWrapper(r, 'utf-8')
3464
3465 # TextIOwrapper will not read the full string, because
3466 # we truncate it to a multiple of the native int size
3467 # so that we can construct a more complex memoryview.
3468 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3469
3470 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3471
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003472 def test_issue22849(self):
3473 class F(object):
3474 def readable(self): return True
3475 def writable(self): return True
3476 def seekable(self): return True
3477
3478 for i in range(10):
3479 try:
3480 self.TextIOWrapper(F(), encoding='utf-8')
3481 except Exception:
3482 pass
3483
3484 F.tell = lambda x: 0
3485 t = self.TextIOWrapper(F(), encoding='utf-8')
3486
INADA Naoki507434f2017-12-21 09:59:53 +09003487 def test_reconfigure_encoding_read(self):
3488 # latin1 -> utf8
3489 # (latin1 can decode utf-8 encoded string)
3490 data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
3491 raw = self.BytesIO(data)
3492 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3493 self.assertEqual(txt.readline(), 'abc\xe9\n')
3494 with self.assertRaises(self.UnsupportedOperation):
3495 txt.reconfigure(encoding='utf-8')
3496 with self.assertRaises(self.UnsupportedOperation):
3497 txt.reconfigure(newline=None)
3498
3499 def test_reconfigure_write_fromascii(self):
3500 # ascii has a specific encodefunc in the C implementation,
3501 # but utf-8-sig has not. Make sure that we get rid of the
3502 # cached encodefunc when we switch encoders.
3503 raw = self.BytesIO()
3504 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3505 txt.write('foo\n')
3506 txt.reconfigure(encoding='utf-8-sig')
3507 txt.write('\xe9\n')
3508 txt.flush()
3509 self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
3510
3511 def test_reconfigure_write(self):
3512 # latin -> utf8
3513 raw = self.BytesIO()
3514 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3515 txt.write('abc\xe9\n')
3516 txt.reconfigure(encoding='utf-8')
3517 self.assertEqual(raw.getvalue(), b'abc\xe9\n')
3518 txt.write('d\xe9f\n')
3519 txt.flush()
3520 self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
3521
3522 # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
3523 # the file
3524 raw = self.BytesIO()
3525 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3526 txt.write('abc\n')
3527 txt.reconfigure(encoding='utf-8-sig')
3528 txt.write('d\xe9f\n')
3529 txt.flush()
3530 self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
3531
3532 def test_reconfigure_write_non_seekable(self):
3533 raw = self.BytesIO()
3534 raw.seekable = lambda: False
3535 raw.seek = None
3536 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3537 txt.write('abc\n')
3538 txt.reconfigure(encoding='utf-8-sig')
3539 txt.write('d\xe9f\n')
3540 txt.flush()
3541
3542 # If the raw stream is not seekable, there'll be a BOM
3543 self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
3544
3545 def test_reconfigure_defaults(self):
3546 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
3547 txt.reconfigure(encoding=None)
3548 self.assertEqual(txt.encoding, 'ascii')
3549 self.assertEqual(txt.errors, 'replace')
3550 txt.write('LF\n')
3551
3552 txt.reconfigure(newline='\r\n')
3553 self.assertEqual(txt.encoding, 'ascii')
3554 self.assertEqual(txt.errors, 'replace')
3555
3556 txt.reconfigure(errors='ignore')
3557 self.assertEqual(txt.encoding, 'ascii')
3558 self.assertEqual(txt.errors, 'ignore')
3559 txt.write('CRLF\n')
3560
3561 txt.reconfigure(encoding='utf-8', newline=None)
3562 self.assertEqual(txt.errors, 'strict')
3563 txt.seek(0)
3564 self.assertEqual(txt.read(), 'LF\nCRLF\n')
3565
3566 self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
3567
3568 def test_reconfigure_newline(self):
3569 raw = self.BytesIO(b'CR\rEOF')
3570 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3571 txt.reconfigure(newline=None)
3572 self.assertEqual(txt.readline(), 'CR\n')
3573 raw = self.BytesIO(b'CR\rEOF')
3574 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3575 txt.reconfigure(newline='')
3576 self.assertEqual(txt.readline(), 'CR\r')
3577 raw = self.BytesIO(b'CR\rLF\nEOF')
3578 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3579 txt.reconfigure(newline='\n')
3580 self.assertEqual(txt.readline(), 'CR\rLF\n')
3581 raw = self.BytesIO(b'LF\nCR\rEOF')
3582 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3583 txt.reconfigure(newline='\r')
3584 self.assertEqual(txt.readline(), 'LF\nCR\r')
3585 raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
3586 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3587 txt.reconfigure(newline='\r\n')
3588 self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
3589
3590 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
3591 txt.reconfigure(newline=None)
3592 txt.write('linesep\n')
3593 txt.reconfigure(newline='')
3594 txt.write('LF\n')
3595 txt.reconfigure(newline='\n')
3596 txt.write('LF\n')
3597 txt.reconfigure(newline='\r')
3598 txt.write('CR\n')
3599 txt.reconfigure(newline='\r\n')
3600 txt.write('CRLF\n')
3601 expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
3602 self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
3603
Zackery Spytz23db9352018-06-29 04:14:58 -06003604 def test_issue25862(self):
3605 # Assertion failures occurred in tell() after read() and write().
3606 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3607 t.read(1)
3608 t.read()
3609 t.tell()
3610 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3611 t.read(1)
3612 t.write('x')
3613 t.tell()
3614
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003615
Antoine Pitroub8503892014-04-29 10:14:02 +02003616class MemviewBytesIO(io.BytesIO):
3617 '''A BytesIO object whose read method returns memoryviews
3618 rather than bytes'''
3619
3620 def read1(self, len_):
3621 return _to_memoryview(super().read1(len_))
3622
3623 def read(self, len_):
3624 return _to_memoryview(super().read(len_))
3625
3626def _to_memoryview(buf):
3627 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3628
3629 arr = array.array('i')
3630 idx = len(buf) - len(buf) % arr.itemsize
3631 arr.frombytes(buf[:idx])
3632 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003633
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003634
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003635class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003636 io = io
Steve Dowerfe8f4c92016-09-07 09:31:52 -07003637 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003638
3639 def test_initialization(self):
3640 r = self.BytesIO(b"\xc3\xa9\n\n")
3641 b = self.BufferedReader(r, 1000)
3642 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003643 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3644 self.assertRaises(ValueError, t.read)
3645
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003646 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3647 self.assertRaises(Exception, repr, t)
3648
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003649 def test_garbage_collection(self):
3650 # C TextIOWrapper objects are collected, and collecting them flushes
3651 # all data to disk.
3652 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003653 with support.check_warnings(('', ResourceWarning)):
3654 rawio = io.FileIO(support.TESTFN, "wb")
3655 b = self.BufferedWriter(rawio)
3656 t = self.TextIOWrapper(b, encoding="ascii")
3657 t.write("456def")
3658 t.x = t
3659 wr = weakref.ref(t)
3660 del t
3661 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003662 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003663 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003664 self.assertEqual(f.read(), b"456def")
3665
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003666 def test_rwpair_cleared_before_textio(self):
3667 # Issue 13070: TextIOWrapper's finalization would crash when called
3668 # after the reference to the underlying BufferedRWPair's writer got
3669 # cleared by the GC.
3670 for i in range(1000):
3671 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3672 t1 = self.TextIOWrapper(b1, encoding="ascii")
3673 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3674 t2 = self.TextIOWrapper(b2, encoding="ascii")
3675 # circular references
3676 t1.buddy = t2
3677 t2.buddy = t1
3678 support.gc_collect()
3679
Zackery Spytz842acaa2018-12-17 07:52:45 -07003680 def test_del__CHUNK_SIZE_SystemError(self):
3681 t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
3682 with self.assertRaises(AttributeError):
3683 del t._CHUNK_SIZE
3684
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003685
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003686class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003687 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003688 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003689
3690
3691class IncrementalNewlineDecoderTest(unittest.TestCase):
3692
3693 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003694 # UTF-8 specific tests for a newline decoder
3695 def _check_decode(b, s, **kwargs):
3696 # We exercise getstate() / setstate() as well as decode()
3697 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003698 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003699 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003700 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003701
Antoine Pitrou180a3362008-12-14 16:36:46 +00003702 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003703
Antoine Pitrou180a3362008-12-14 16:36:46 +00003704 _check_decode(b'\xe8', "")
3705 _check_decode(b'\xa2', "")
3706 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003707
Antoine Pitrou180a3362008-12-14 16:36:46 +00003708 _check_decode(b'\xe8', "")
3709 _check_decode(b'\xa2', "")
3710 _check_decode(b'\x88', "\u8888")
3711
3712 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003713 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3714
Antoine Pitrou180a3362008-12-14 16:36:46 +00003715 decoder.reset()
3716 _check_decode(b'\n', "\n")
3717 _check_decode(b'\r', "")
3718 _check_decode(b'', "\n", final=True)
3719 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003720
Antoine Pitrou180a3362008-12-14 16:36:46 +00003721 _check_decode(b'\r', "")
3722 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003723
Antoine Pitrou180a3362008-12-14 16:36:46 +00003724 _check_decode(b'\r\r\n', "\n\n")
3725 _check_decode(b'\r', "")
3726 _check_decode(b'\r', "\n")
3727 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003728
Antoine Pitrou180a3362008-12-14 16:36:46 +00003729 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3730 _check_decode(b'\xe8\xa2\x88', "\u8888")
3731 _check_decode(b'\n', "\n")
3732 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3733 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003734
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003735 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003736 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003737 if encoding is not None:
3738 encoder = codecs.getincrementalencoder(encoding)()
3739 def _decode_bytewise(s):
3740 # Decode one byte at a time
3741 for b in encoder.encode(s):
3742 result.append(decoder.decode(bytes([b])))
3743 else:
3744 encoder = None
3745 def _decode_bytewise(s):
3746 # Decode one char at a time
3747 for c in s:
3748 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003749 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003750 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003751 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003752 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003753 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003754 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003755 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003756 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003757 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003758 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003759 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003760 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003761 input = "abc"
3762 if encoder is not None:
3763 encoder.reset()
3764 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003765 self.assertEqual(decoder.decode(input), "abc")
3766 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003767
3768 def test_newline_decoder(self):
3769 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003770 # None meaning the IncrementalNewlineDecoder takes unicode input
3771 # rather than bytes input
3772 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003773 'utf-16', 'utf-16-le', 'utf-16-be',
3774 'utf-32', 'utf-32-le', 'utf-32-be',
3775 )
3776 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003777 decoder = enc and codecs.getincrementaldecoder(enc)()
3778 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3779 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003780 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003781 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3782 self.check_newline_decoding_utf8(decoder)
Oren Milman1d1d3e92017-08-20 18:35:36 +03003783 self.assertRaises(TypeError, decoder.setstate, 42)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003784
Antoine Pitrou66913e22009-03-06 23:40:56 +00003785 def test_newline_bytes(self):
3786 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3787 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003788 self.assertEqual(dec.newlines, None)
3789 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3790 self.assertEqual(dec.newlines, None)
3791 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3792 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003793 dec = self.IncrementalNewlineDecoder(None, translate=False)
3794 _check(dec)
3795 dec = self.IncrementalNewlineDecoder(None, translate=True)
3796 _check(dec)
3797
Xiang Zhangb08746b2018-10-31 19:49:16 +08003798 def test_translate(self):
3799 # issue 35062
3800 for translate in (-2, -1, 1, 2):
3801 decoder = codecs.getincrementaldecoder("utf-8")()
3802 decoder = self.IncrementalNewlineDecoder(decoder, translate)
3803 self.check_newline_decoding_utf8(decoder)
3804 decoder = codecs.getincrementaldecoder("utf-8")()
3805 decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
3806 self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
3807
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003808class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3809 pass
3810
3811class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3812 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003813
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003814
Guido van Rossum01a27522007-03-07 01:00:12 +00003815# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003816
Guido van Rossum5abbf752007-08-27 17:39:33 +00003817class MiscIOTest(unittest.TestCase):
3818
Barry Warsaw40e82462008-11-20 20:14:50 +00003819 def tearDown(self):
3820 support.unlink(support.TESTFN)
3821
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003822 def test___all__(self):
3823 for name in self.io.__all__:
3824 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003825 self.assertIsNotNone(obj, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003826 if name == "open":
3827 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003828 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003829 self.assertTrue(issubclass(obj, Exception), name)
3830 elif not name.startswith("SEEK_"):
3831 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003832
Barry Warsaw40e82462008-11-20 20:14:50 +00003833 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003834 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003835 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003836 f.close()
3837
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003838 with support.check_warnings(('', DeprecationWarning)):
3839 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003840 self.assertEqual(f.name, support.TESTFN)
3841 self.assertEqual(f.buffer.name, support.TESTFN)
3842 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3843 self.assertEqual(f.mode, "U")
3844 self.assertEqual(f.buffer.mode, "rb")
3845 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003846 f.close()
3847
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003848 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003849 self.assertEqual(f.mode, "w+")
3850 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3851 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003852
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003853 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003854 self.assertEqual(g.mode, "wb")
3855 self.assertEqual(g.raw.mode, "wb")
3856 self.assertEqual(g.name, f.fileno())
3857 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003858 f.close()
3859 g.close()
3860
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003861 def test_io_after_close(self):
3862 for kwargs in [
3863 {"mode": "w"},
3864 {"mode": "wb"},
3865 {"mode": "w", "buffering": 1},
3866 {"mode": "w", "buffering": 2},
3867 {"mode": "wb", "buffering": 0},
3868 {"mode": "r"},
3869 {"mode": "rb"},
3870 {"mode": "r", "buffering": 1},
3871 {"mode": "r", "buffering": 2},
3872 {"mode": "rb", "buffering": 0},
3873 {"mode": "w+"},
3874 {"mode": "w+b"},
3875 {"mode": "w+", "buffering": 1},
3876 {"mode": "w+", "buffering": 2},
3877 {"mode": "w+b", "buffering": 0},
3878 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003879 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003880 f.close()
3881 self.assertRaises(ValueError, f.flush)
3882 self.assertRaises(ValueError, f.fileno)
3883 self.assertRaises(ValueError, f.isatty)
3884 self.assertRaises(ValueError, f.__iter__)
3885 if hasattr(f, "peek"):
3886 self.assertRaises(ValueError, f.peek, 1)
3887 self.assertRaises(ValueError, f.read)
3888 if hasattr(f, "read1"):
3889 self.assertRaises(ValueError, f.read1, 1024)
Martin Panterccb2c0e2016-10-20 23:48:14 +00003890 self.assertRaises(ValueError, f.read1)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003891 if hasattr(f, "readall"):
3892 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003893 if hasattr(f, "readinto"):
3894 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003895 if hasattr(f, "readinto1"):
3896 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003897 self.assertRaises(ValueError, f.readline)
3898 self.assertRaises(ValueError, f.readlines)
Xiang Zhang026435c2017-04-15 12:47:28 +08003899 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003900 self.assertRaises(ValueError, f.seek, 0)
3901 self.assertRaises(ValueError, f.tell)
3902 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003903 self.assertRaises(ValueError, f.write,
3904 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003905 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003906 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003907
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003908 def test_blockingioerror(self):
3909 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003910 class C(str):
3911 pass
3912 c = C("")
3913 b = self.BlockingIOError(1, c)
3914 c.b = b
3915 b.c = c
3916 wr = weakref.ref(c)
3917 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003918 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003919 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003920
3921 def test_abcs(self):
3922 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003923 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3924 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3925 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3926 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003927
3928 def _check_abc_inheritance(self, abcmodule):
3929 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003930 self.assertIsInstance(f, abcmodule.IOBase)
3931 self.assertIsInstance(f, abcmodule.RawIOBase)
3932 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3933 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003934 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003935 self.assertIsInstance(f, abcmodule.IOBase)
3936 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3937 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3938 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003939 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003940 self.assertIsInstance(f, abcmodule.IOBase)
3941 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3942 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3943 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003944
3945 def test_abc_inheritance(self):
3946 # Test implementations inherit from their respective ABCs
3947 self._check_abc_inheritance(self)
3948
3949 def test_abc_inheritance_official(self):
3950 # Test implementations inherit from the official ABCs of the
3951 # baseline "io" module.
3952 self._check_abc_inheritance(io)
3953
Antoine Pitroue033e062010-10-29 10:38:18 +00003954 def _check_warn_on_dealloc(self, *args, **kwargs):
3955 f = open(*args, **kwargs)
3956 r = repr(f)
3957 with self.assertWarns(ResourceWarning) as cm:
3958 f = None
3959 support.gc_collect()
3960 self.assertIn(r, str(cm.warning.args[0]))
3961
3962 def test_warn_on_dealloc(self):
3963 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3964 self._check_warn_on_dealloc(support.TESTFN, "wb")
3965 self._check_warn_on_dealloc(support.TESTFN, "w")
3966
3967 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3968 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003969 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003970 for fd in fds:
3971 try:
3972 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003973 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003974 if e.errno != errno.EBADF:
3975 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003976 self.addCleanup(cleanup_fds)
3977 r, w = os.pipe()
3978 fds += r, w
3979 self._check_warn_on_dealloc(r, *args, **kwargs)
3980 # When using closefd=False, there's no warning
3981 r, w = os.pipe()
3982 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02003983 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00003984 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00003985
3986 def test_warn_on_dealloc_fd(self):
3987 self._check_warn_on_dealloc_fd("rb", buffering=0)
3988 self._check_warn_on_dealloc_fd("rb")
3989 self._check_warn_on_dealloc_fd("r")
3990
3991
Antoine Pitrou243757e2010-11-05 21:15:39 +00003992 def test_pickling(self):
3993 # Pickling file objects is forbidden
3994 for kwargs in [
3995 {"mode": "w"},
3996 {"mode": "wb"},
3997 {"mode": "wb", "buffering": 0},
3998 {"mode": "r"},
3999 {"mode": "rb"},
4000 {"mode": "rb", "buffering": 0},
4001 {"mode": "w+"},
4002 {"mode": "w+b"},
4003 {"mode": "w+b", "buffering": 0},
4004 ]:
4005 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
4006 with self.open(support.TESTFN, **kwargs) as f:
4007 self.assertRaises(TypeError, pickle.dumps, f, protocol)
4008
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004009 def test_nonblock_pipe_write_bigbuf(self):
4010 self._test_nonblock_pipe_write(16*1024)
4011
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004012 def test_nonblock_pipe_write_smallbuf(self):
4013 self._test_nonblock_pipe_write(1024)
4014
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004015 @unittest.skipUnless(hasattr(os, 'set_blocking'),
4016 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004017 def _test_nonblock_pipe_write(self, bufsize):
4018 sent = []
4019 received = []
4020 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004021 os.set_blocking(r, False)
4022 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004023
4024 # To exercise all code paths in the C implementation we need
4025 # to play with buffer sizes. For instance, if we choose a
4026 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
4027 # then we will never get a partial write of the buffer.
4028 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
4029 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
4030
4031 with rf, wf:
4032 for N in 9999, 73, 7574:
4033 try:
4034 i = 0
4035 while True:
4036 msg = bytes([i % 26 + 97]) * N
4037 sent.append(msg)
4038 wf.write(msg)
4039 i += 1
4040
4041 except self.BlockingIOError as e:
4042 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004043 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004044 sent[-1] = sent[-1][:e.characters_written]
4045 received.append(rf.read())
4046 msg = b'BLOCKED'
4047 wf.write(msg)
4048 sent.append(msg)
4049
4050 while True:
4051 try:
4052 wf.flush()
4053 break
4054 except self.BlockingIOError as e:
4055 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004056 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004057 self.assertEqual(e.characters_written, 0)
4058 received.append(rf.read())
4059
4060 received += iter(rf.read, None)
4061
4062 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03004063 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004064 self.assertTrue(wf.closed)
4065 self.assertTrue(rf.closed)
4066
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004067 def test_create_fail(self):
4068 # 'x' mode fails if file is existing
4069 with self.open(support.TESTFN, 'w'):
4070 pass
4071 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
4072
4073 def test_create_writes(self):
4074 # 'x' mode opens for writing
4075 with self.open(support.TESTFN, 'xb') as f:
4076 f.write(b"spam")
4077 with self.open(support.TESTFN, 'rb') as f:
4078 self.assertEqual(b"spam", f.read())
4079
Christian Heimes7b648752012-09-10 14:48:43 +02004080 def test_open_allargs(self):
4081 # there used to be a buffer overflow in the parser for rawmode
4082 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
4083
4084
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004085class CMiscIOTest(MiscIOTest):
4086 io = io
4087
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03004088 def test_readinto_buffer_overflow(self):
4089 # Issue #18025
4090 class BadReader(self.io.BufferedIOBase):
4091 def read(self, n=-1):
4092 return b'x' * 10**6
4093 bufio = BadReader()
4094 b = bytearray(2)
4095 self.assertRaises(ValueError, bufio.readinto, b)
4096
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004097 def check_daemon_threads_shutdown_deadlock(self, stream_name):
4098 # Issue #23309: deadlocks at shutdown should be avoided when a
4099 # daemon thread and the main thread both write to a file.
4100 code = """if 1:
4101 import sys
4102 import time
4103 import threading
Victor Stinner2a1aed02017-04-21 17:59:23 +02004104 from test.support import SuppressCrashReport
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004105
4106 file = sys.{stream_name}
4107
4108 def run():
4109 while True:
4110 file.write('.')
4111 file.flush()
4112
Victor Stinner2a1aed02017-04-21 17:59:23 +02004113 crash = SuppressCrashReport()
4114 crash.__enter__()
4115 # don't call __exit__(): the crash occurs at Python shutdown
4116
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004117 thread = threading.Thread(target=run)
4118 thread.daemon = True
4119 thread.start()
4120
4121 time.sleep(0.5)
4122 file.write('!')
4123 file.flush()
4124 """.format_map(locals())
4125 res, _ = run_python_until_end("-c", code)
4126 err = res.err.decode()
4127 if res.rc != 0:
4128 # Failure: should be a fatal error
4129 self.assertIn("Fatal Python error: could not acquire lock "
4130 "for <_io.BufferedWriter name='<{stream_name}>'> "
4131 "at interpreter shutdown, possibly due to "
4132 "daemon threads".format_map(locals()),
4133 err)
4134 else:
4135 self.assertFalse(err.strip('.!'))
4136
4137 def test_daemon_threads_shutdown_stdout_deadlock(self):
4138 self.check_daemon_threads_shutdown_deadlock('stdout')
4139
4140 def test_daemon_threads_shutdown_stderr_deadlock(self):
4141 self.check_daemon_threads_shutdown_deadlock('stderr')
4142
4143
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004144class PyMiscIOTest(MiscIOTest):
4145 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00004146
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004147
4148@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
4149class SignalsTest(unittest.TestCase):
4150
4151 def setUp(self):
4152 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
4153
4154 def tearDown(self):
4155 signal.signal(signal.SIGALRM, self.oldalrm)
4156
4157 def alarm_interrupt(self, sig, frame):
4158 1/0
4159
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004160 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
4161 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00004162 invokes the signal handler, and bubbles up the exception raised
4163 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004164 read_results = []
4165 def _read():
4166 s = os.read(r, 1)
4167 read_results.append(s)
Victor Stinner05c9d312018-12-18 23:52:39 +01004168
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004169 t = threading.Thread(target=_read)
4170 t.daemon = True
4171 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00004172 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01004173 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004174 try:
4175 wio = self.io.open(w, **fdopen_kwargs)
Victor Stinner05c9d312018-12-18 23:52:39 +01004176 if hasattr(signal, 'pthread_sigmask'):
4177 # create the thread with SIGALRM signal blocked
4178 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
4179 t.start()
4180 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM])
4181 else:
4182 t.start()
4183
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004184 # Fill the pipe enough that the write will be blocking.
4185 # It will be interrupted by the timer armed above. Since the
4186 # other thread has read one byte, the low-level write will
4187 # return with a successful (partial) result rather than an EINTR.
4188 # The buffered IO layer must check for pending signal
4189 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02004190 signal.alarm(1)
4191 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01004192 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02004193 finally:
4194 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03004195 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004196 # We got one byte, get another one and check that it isn't a
4197 # repeat of the first one.
4198 read_results.append(os.read(r, 1))
4199 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
4200 finally:
4201 os.close(w)
4202 os.close(r)
4203 # This is deliberate. If we didn't close the file descriptor
4204 # before closing wio, wio would try to flush its internal
4205 # buffer, and block again.
4206 try:
4207 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004208 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004209 if e.errno != errno.EBADF:
4210 raise
4211
4212 def test_interrupted_write_unbuffered(self):
4213 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
4214
4215 def test_interrupted_write_buffered(self):
4216 self.check_interrupted_write(b"xy", b"xy", mode="wb")
4217
4218 def test_interrupted_write_text(self):
4219 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
4220
Brett Cannon31f59292011-02-21 19:29:56 +00004221 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004222 def check_reentrant_write(self, data, **fdopen_kwargs):
4223 def on_alarm(*args):
4224 # Will be called reentrantly from the same thread
4225 wio.write(data)
4226 1/0
4227 signal.signal(signal.SIGALRM, on_alarm)
4228 r, w = os.pipe()
4229 wio = self.io.open(w, **fdopen_kwargs)
4230 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004231 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004232 # Either the reentrant call to wio.write() fails with RuntimeError,
4233 # or the signal handler raises ZeroDivisionError.
4234 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4235 while 1:
4236 for i in range(100):
4237 wio.write(data)
4238 wio.flush()
4239 # Make sure the buffer doesn't fill up and block further writes
4240 os.read(r, len(data) * 100)
4241 exc = cm.exception
4242 if isinstance(exc, RuntimeError):
4243 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4244 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004245 signal.alarm(0)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004246 wio.close()
4247 os.close(r)
4248
4249 def test_reentrant_write_buffered(self):
4250 self.check_reentrant_write(b"xy", mode="wb")
4251
4252 def test_reentrant_write_text(self):
4253 self.check_reentrant_write("xy", mode="w", encoding="ascii")
4254
Antoine Pitrou707ce822011-02-25 21:24:11 +00004255 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4256 """Check that a buffered read, when it gets interrupted (either
4257 returning a partial result or EINTR), properly invokes the signal
4258 handler and retries if the latter returned successfully."""
4259 r, w = os.pipe()
4260 fdopen_kwargs["closefd"] = False
4261 def alarm_handler(sig, frame):
4262 os.write(w, b"bar")
4263 signal.signal(signal.SIGALRM, alarm_handler)
4264 try:
4265 rio = self.io.open(r, **fdopen_kwargs)
4266 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004267 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004268 # Expected behaviour:
4269 # - first raw read() returns partial b"foo"
4270 # - second raw read() returns EINTR
4271 # - third raw read() returns b"bar"
4272 self.assertEqual(decode(rio.read(6)), "foobar")
4273 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004274 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004275 rio.close()
4276 os.close(w)
4277 os.close(r)
4278
Antoine Pitrou20db5112011-08-19 20:32:34 +02004279 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004280 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4281 mode="rb")
4282
Antoine Pitrou20db5112011-08-19 20:32:34 +02004283 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004284 self.check_interrupted_read_retry(lambda x: x,
4285 mode="r")
4286
Antoine Pitrou707ce822011-02-25 21:24:11 +00004287 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4288 """Check that a buffered write, when it gets interrupted (either
4289 returning a partial result or EINTR), properly invokes the signal
4290 handler and retries if the latter returned successfully."""
4291 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004292
Antoine Pitrou707ce822011-02-25 21:24:11 +00004293 # A quantity that exceeds the buffer size of an anonymous pipe's
4294 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02004295 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00004296 r, w = os.pipe()
4297 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004298
Antoine Pitrou707ce822011-02-25 21:24:11 +00004299 # We need a separate thread to read from the pipe and allow the
4300 # write() to finish. This thread is started after the SIGALRM is
4301 # received (forcing a first EINTR in write()).
4302 read_results = []
4303 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004304 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00004305 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004306 try:
4307 while not write_finished:
4308 while r in select.select([r], [], [], 1.0)[0]:
4309 s = os.read(r, 1024)
4310 read_results.append(s)
4311 except BaseException as exc:
4312 nonlocal error
4313 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00004314 t = threading.Thread(target=_read)
4315 t.daemon = True
4316 def alarm1(sig, frame):
4317 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004318 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004319 def alarm2(sig, frame):
4320 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004321
4322 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00004323 signal.signal(signal.SIGALRM, alarm1)
4324 try:
4325 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004326 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004327 # Expected behaviour:
4328 # - first raw write() is partial (because of the limited pipe buffer
4329 # and the first alarm)
4330 # - second raw write() returns EINTR (because of the second alarm)
4331 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004332 written = wio.write(large_data)
4333 self.assertEqual(N, written)
4334
Antoine Pitrou707ce822011-02-25 21:24:11 +00004335 wio.flush()
4336 write_finished = True
4337 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004338
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004339 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004340 self.assertEqual(N, sum(len(x) for x in read_results))
4341 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004342 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004343 write_finished = True
4344 os.close(w)
4345 os.close(r)
4346 # This is deliberate. If we didn't close the file descriptor
4347 # before closing wio, wio would try to flush its internal
4348 # buffer, and could block (in case of failure).
4349 try:
4350 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004351 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00004352 if e.errno != errno.EBADF:
4353 raise
4354
Antoine Pitrou20db5112011-08-19 20:32:34 +02004355 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004356 self.check_interrupted_write_retry(b"x", mode="wb")
4357
Antoine Pitrou20db5112011-08-19 20:32:34 +02004358 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004359 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4360
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004361
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004362class CSignalsTest(SignalsTest):
4363 io = io
4364
4365class PySignalsTest(SignalsTest):
4366 io = pyio
4367
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004368 # Handling reentrancy issues would slow down _pyio even more, so the
4369 # tests are disabled.
4370 test_reentrant_write_buffered = None
4371 test_reentrant_write_text = None
4372
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004373
Ezio Melottidaa42c72013-03-23 16:30:16 +02004374def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07004375 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004376 CBufferedReaderTest, PyBufferedReaderTest,
4377 CBufferedWriterTest, PyBufferedWriterTest,
4378 CBufferedRWPairTest, PyBufferedRWPairTest,
4379 CBufferedRandomTest, PyBufferedRandomTest,
4380 StatefulIncrementalDecoderTest,
4381 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4382 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004383 CMiscIOTest, PyMiscIOTest,
4384 CSignalsTest, PySignalsTest,
4385 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004386
4387 # Put the namespaces of the IO module we are testing and some useful mock
4388 # classes in the __dict__ of each test.
4389 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
benfogle9703f092017-11-10 16:03:40 -05004390 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4391 SlowFlushRawIO)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004392 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4393 c_io_ns = {name : getattr(io, name) for name in all_members}
4394 py_io_ns = {name : getattr(pyio, name) for name in all_members}
4395 globs = globals()
4396 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4397 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4398 # Avoid turning open into a bound method.
4399 py_io_ns["open"] = pyio.OpenWrapper
4400 for test in tests:
4401 if test.__name__.startswith("C"):
4402 for name, obj in c_io_ns.items():
4403 setattr(test, name, obj)
4404 elif test.__name__.startswith("Py"):
4405 for name, obj in py_io_ns.items():
4406 setattr(test, name, obj)
4407
Ezio Melottidaa42c72013-03-23 16:30:16 +02004408 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4409 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004410
4411if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004412 unittest.main()