blob: a03a7f78109c234fb36b2294b85d7cf15c77b183 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020031import threading
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000032import time
Guido van Rossum28524c72007-02-27 05:47:44 +000033import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000034import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020035import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020036from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020037from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000038from test import support
Berker Peksagce643912015-05-06 06:33:17 +030039from test.support.script_helper import assert_python_ok, run_python_until_end
Serhiy Storchakab21d1552018-03-02 11:53:51 +020040from test.support import FakePath
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000041
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000042import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000043import io # C implementation of io
44import _pyio as pyio # Python implementation of io
Guido van Rossuma9e20242007-03-08 00:43:48 +000045
Martin Panter6bb91f32016-05-28 00:41:57 +000046try:
47 import ctypes
48except ImportError:
49 def byteslike(*pos, **kw):
50 return array.array("b", bytes(*pos, **kw))
51else:
52 def byteslike(*pos, **kw):
53 """Create a bytes-like object having no string or sequence methods"""
54 data = bytes(*pos, **kw)
55 obj = EmptyStruct()
56 ctypes.resize(obj, len(data))
57 memoryview(obj).cast("B")[:] = data
58 return obj
59 class EmptyStruct(ctypes.Structure):
60 pass
61
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000062def _default_chunk_size():
63 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000064 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 return f._CHUNK_SIZE
66
67
Antoine Pitrou328ec742010-09-14 18:37:24 +000068class MockRawIOWithoutRead:
69 """A RawIO implementation without read(), so as to exercise the default
70 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000071
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000072 def __init__(self, read_stack=()):
73 self._read_stack = list(read_stack)
74 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000075 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000076 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000077
Guido van Rossum01a27522007-03-07 01:00:12 +000078 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000079 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000080 return len(b)
81
82 def writable(self):
83 return True
84
Guido van Rossum68bbcd22007-02-27 17:19:33 +000085 def fileno(self):
86 return 42
87
88 def readable(self):
89 return True
90
Guido van Rossum01a27522007-03-07 01:00:12 +000091 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000092 return True
93
Guido van Rossum01a27522007-03-07 01:00:12 +000094 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000095 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000096
97 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000098 return 0 # same comment as above
99
100 def readinto(self, buf):
101 self._reads += 1
102 max_len = len(buf)
103 try:
104 data = self._read_stack[0]
105 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000106 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000107 return 0
108 if data is None:
109 del self._read_stack[0]
110 return None
111 n = len(data)
112 if len(data) <= max_len:
113 del self._read_stack[0]
114 buf[:n] = data
115 return n
116 else:
117 buf[:] = data[:max_len]
118 self._read_stack[0] = data[max_len:]
119 return max_len
120
121 def truncate(self, pos=None):
122 return pos
123
Antoine Pitrou328ec742010-09-14 18:37:24 +0000124class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
125 pass
126
127class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
128 pass
129
130
131class MockRawIO(MockRawIOWithoutRead):
132
133 def read(self, n=None):
134 self._reads += 1
135 try:
136 return self._read_stack.pop(0)
137 except:
138 self._extraneous_reads += 1
139 return b""
140
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000141class CMockRawIO(MockRawIO, io.RawIOBase):
142 pass
143
144class PyMockRawIO(MockRawIO, pyio.RawIOBase):
145 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000146
Guido van Rossuma9e20242007-03-08 00:43:48 +0000147
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000148class MisbehavedRawIO(MockRawIO):
149 def write(self, b):
150 return super().write(b) * 2
151
152 def read(self, n=None):
153 return super().read(n) * 2
154
155 def seek(self, pos, whence):
156 return -123
157
158 def tell(self):
159 return -456
160
161 def readinto(self, buf):
162 super().readinto(buf)
163 return len(buf) * 5
164
165class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
166 pass
167
168class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
169 pass
170
171
benfogle9703f092017-11-10 16:03:40 -0500172class SlowFlushRawIO(MockRawIO):
173 def __init__(self):
174 super().__init__()
175 self.in_flush = threading.Event()
176
177 def flush(self):
178 self.in_flush.set()
179 time.sleep(0.25)
180
181class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
182 pass
183
184class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
185 pass
186
187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188class CloseFailureIO(MockRawIO):
189 closed = 0
190
191 def close(self):
192 if not self.closed:
193 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200194 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000195
196class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
197 pass
198
199class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
200 pass
201
202
203class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000204
205 def __init__(self, data):
206 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000207 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000208
209 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000210 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000211 self.read_history.append(None if res is None else len(res))
212 return res
213
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000214 def readinto(self, b):
215 res = super().readinto(b)
216 self.read_history.append(res)
217 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000218
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000219class CMockFileIO(MockFileIO, io.BytesIO):
220 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000221
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000222class PyMockFileIO(MockFileIO, pyio.BytesIO):
223 pass
224
225
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000226class MockUnseekableIO:
227 def seekable(self):
228 return False
229
230 def seek(self, *args):
231 raise self.UnsupportedOperation("not seekable")
232
233 def tell(self, *args):
234 raise self.UnsupportedOperation("not seekable")
235
Martin Panter754aab22016-03-31 07:21:56 +0000236 def truncate(self, *args):
237 raise self.UnsupportedOperation("not seekable")
238
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000239class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
240 UnsupportedOperation = io.UnsupportedOperation
241
242class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
243 UnsupportedOperation = pyio.UnsupportedOperation
244
245
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000246class MockNonBlockWriterIO:
247
248 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000249 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000250 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000251
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000252 def pop_written(self):
253 s = b"".join(self._write_stack)
254 self._write_stack[:] = []
255 return s
256
257 def block_on(self, char):
258 """Block when a given char is encountered."""
259 self._blocker_char = char
260
261 def readable(self):
262 return True
263
264 def seekable(self):
265 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000266
Guido van Rossum01a27522007-03-07 01:00:12 +0000267 def writable(self):
268 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000269
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000270 def write(self, b):
271 b = bytes(b)
272 n = -1
273 if self._blocker_char:
274 try:
275 n = b.index(self._blocker_char)
276 except ValueError:
277 pass
278 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100279 if n > 0:
280 # write data up to the first blocker
281 self._write_stack.append(b[:n])
282 return n
283 else:
284 # cancel blocker and indicate would block
285 self._blocker_char = None
286 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000287 self._write_stack.append(b)
288 return len(b)
289
290class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
291 BlockingIOError = io.BlockingIOError
292
293class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
294 BlockingIOError = pyio.BlockingIOError
295
Guido van Rossuma9e20242007-03-08 00:43:48 +0000296
Guido van Rossum28524c72007-02-27 05:47:44 +0000297class IOTest(unittest.TestCase):
298
Neal Norwitze7789b12008-03-24 06:18:09 +0000299 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000300 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000301
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000302 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000303 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000304
Guido van Rossum28524c72007-02-27 05:47:44 +0000305 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000306 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000307 f.truncate(0)
308 self.assertEqual(f.tell(), 5)
309 f.seek(0)
310
311 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000312 self.assertEqual(f.seek(0), 0)
313 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000314 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000315 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000316 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000317 buffer = bytearray(b" world\n\n\n")
318 self.assertEqual(f.write(buffer), 9)
319 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000320 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000321 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000322 self.assertEqual(f.seek(-1, 2), 13)
323 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000324
Guido van Rossum87429772007-04-10 21:06:59 +0000325 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000326 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000327 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000328
Guido van Rossum9b76da62007-04-11 01:09:03 +0000329 def read_ops(self, f, buffered=False):
330 data = f.read(5)
331 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000332 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000333 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000334 self.assertEqual(bytes(data), b" worl")
335 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000336 self.assertEqual(f.readinto(data), 2)
337 self.assertEqual(len(data), 5)
338 self.assertEqual(data[:2], b"d\n")
339 self.assertEqual(f.seek(0), 0)
340 self.assertEqual(f.read(20), b"hello world\n")
341 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000342 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000343 self.assertEqual(f.seek(-6, 2), 6)
344 self.assertEqual(f.read(5), b"world")
345 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000346 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000347 self.assertEqual(f.seek(-6, 1), 5)
348 self.assertEqual(f.read(5), b" worl")
349 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000350 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000351 if buffered:
352 f.seek(0)
353 self.assertEqual(f.read(), b"hello world\n")
354 f.seek(6)
355 self.assertEqual(f.read(), b"world\n")
356 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000357 f.seek(0)
358 data = byteslike(5)
359 self.assertEqual(f.readinto1(data), 5)
360 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000361
Guido van Rossum34d69e52007-04-10 20:08:41 +0000362 LARGE = 2**31
363
Guido van Rossum53807da2007-04-10 19:01:47 +0000364 def large_file_ops(self, f):
365 assert f.readable()
366 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100367 try:
368 self.assertEqual(f.seek(self.LARGE), self.LARGE)
369 except (OverflowError, ValueError):
370 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000371 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000372 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000373 self.assertEqual(f.tell(), self.LARGE + 3)
374 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000375 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000376 self.assertEqual(f.tell(), self.LARGE + 2)
377 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000378 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000379 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000380 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
381 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000382 self.assertEqual(f.read(2), b"x")
383
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000384 def test_invalid_operations(self):
385 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000386 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000387 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000388 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000389 self.assertRaises(exc, fp.read)
390 self.assertRaises(exc, fp.readline)
391 with self.open(support.TESTFN, "wb", buffering=0) as fp:
392 self.assertRaises(exc, fp.read)
393 self.assertRaises(exc, fp.readline)
394 with self.open(support.TESTFN, "rb", buffering=0) as fp:
395 self.assertRaises(exc, fp.write, b"blah")
396 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000397 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000398 self.assertRaises(exc, fp.write, b"blah")
399 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000400 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000401 self.assertRaises(exc, fp.write, "blah")
402 self.assertRaises(exc, fp.writelines, ["blah\n"])
403 # Non-zero seeking from current or end pos
404 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
405 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000406
Martin Panter754aab22016-03-31 07:21:56 +0000407 def test_optional_abilities(self):
408 # Test for OSError when optional APIs are not supported
409 # The purpose of this test is to try fileno(), reading, writing and
410 # seeking operations with various objects that indicate they do not
411 # support these operations.
412
413 def pipe_reader():
414 [r, w] = os.pipe()
415 os.close(w) # So that read() is harmless
416 return self.FileIO(r, "r")
417
418 def pipe_writer():
419 [r, w] = os.pipe()
420 self.addCleanup(os.close, r)
421 # Guarantee that we can write into the pipe without blocking
422 thread = threading.Thread(target=os.read, args=(r, 100))
423 thread.start()
424 self.addCleanup(thread.join)
425 return self.FileIO(w, "w")
426
427 def buffered_reader():
428 return self.BufferedReader(self.MockUnseekableIO())
429
430 def buffered_writer():
431 return self.BufferedWriter(self.MockUnseekableIO())
432
433 def buffered_random():
434 return self.BufferedRandom(self.BytesIO())
435
436 def buffered_rw_pair():
437 return self.BufferedRWPair(self.MockUnseekableIO(),
438 self.MockUnseekableIO())
439
440 def text_reader():
441 class UnseekableReader(self.MockUnseekableIO):
442 writable = self.BufferedIOBase.writable
443 write = self.BufferedIOBase.write
444 return self.TextIOWrapper(UnseekableReader(), "ascii")
445
446 def text_writer():
447 class UnseekableWriter(self.MockUnseekableIO):
448 readable = self.BufferedIOBase.readable
449 read = self.BufferedIOBase.read
450 return self.TextIOWrapper(UnseekableWriter(), "ascii")
451
452 tests = (
453 (pipe_reader, "fr"), (pipe_writer, "fw"),
454 (buffered_reader, "r"), (buffered_writer, "w"),
455 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
456 (text_reader, "r"), (text_writer, "w"),
457 (self.BytesIO, "rws"), (self.StringIO, "rws"),
458 )
459 for [test, abilities] in tests:
Martin Panter754aab22016-03-31 07:21:56 +0000460 with self.subTest(test), test() as obj:
461 readable = "r" in abilities
462 self.assertEqual(obj.readable(), readable)
463 writable = "w" in abilities
464 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000465
466 if isinstance(obj, self.TextIOBase):
467 data = "3"
468 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
469 data = b"3"
470 else:
471 self.fail("Unknown base class")
472
473 if "f" in abilities:
474 obj.fileno()
475 else:
476 self.assertRaises(OSError, obj.fileno)
477
478 if readable:
479 obj.read(1)
480 obj.read()
481 else:
482 self.assertRaises(OSError, obj.read, 1)
483 self.assertRaises(OSError, obj.read)
484
485 if writable:
486 obj.write(data)
487 else:
488 self.assertRaises(OSError, obj.write, data)
489
Martin Panter3ee147f2016-03-31 21:05:31 +0000490 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000491 pipe_reader, pipe_writer):
492 # Pipes seem to appear as seekable on Windows
493 continue
494 seekable = "s" in abilities
495 self.assertEqual(obj.seekable(), seekable)
496
Martin Panter754aab22016-03-31 07:21:56 +0000497 if seekable:
498 obj.tell()
499 obj.seek(0)
500 else:
501 self.assertRaises(OSError, obj.tell)
502 self.assertRaises(OSError, obj.seek, 0)
503
504 if writable and seekable:
505 obj.truncate()
506 obj.truncate(0)
507 else:
508 self.assertRaises(OSError, obj.truncate)
509 self.assertRaises(OSError, obj.truncate, 0)
510
Antoine Pitrou13348842012-01-29 18:36:34 +0100511 def test_open_handles_NUL_chars(self):
512 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300513 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100514
515 bytes_fn = bytes(fn_with_NUL, 'ascii')
516 with warnings.catch_warnings():
517 warnings.simplefilter("ignore", DeprecationWarning)
518 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100519
Guido van Rossum28524c72007-02-27 05:47:44 +0000520 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000521 with self.open(support.TESTFN, "wb", buffering=0) as f:
522 self.assertEqual(f.readable(), False)
523 self.assertEqual(f.writable(), True)
524 self.assertEqual(f.seekable(), True)
525 self.write_ops(f)
526 with self.open(support.TESTFN, "rb", buffering=0) as f:
527 self.assertEqual(f.readable(), True)
528 self.assertEqual(f.writable(), False)
529 self.assertEqual(f.seekable(), True)
530 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000531
Guido van Rossum87429772007-04-10 21:06:59 +0000532 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000533 with self.open(support.TESTFN, "wb") as f:
534 self.assertEqual(f.readable(), False)
535 self.assertEqual(f.writable(), True)
536 self.assertEqual(f.seekable(), True)
537 self.write_ops(f)
538 with self.open(support.TESTFN, "rb") as f:
539 self.assertEqual(f.readable(), True)
540 self.assertEqual(f.writable(), False)
541 self.assertEqual(f.seekable(), True)
542 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000543
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000544 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000545 with self.open(support.TESTFN, "wb") as f:
546 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
547 with self.open(support.TESTFN, "rb") as f:
548 self.assertEqual(f.readline(), b"abc\n")
549 self.assertEqual(f.readline(10), b"def\n")
550 self.assertEqual(f.readline(2), b"xy")
551 self.assertEqual(f.readline(4), b"zzy\n")
552 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000553 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000554 self.assertRaises(TypeError, f.readline, 5.3)
555 with self.open(support.TESTFN, "r") as f:
556 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000557
Serhiy Storchakabf623ae2017-04-19 20:03:52 +0300558 def test_readline_nonsizeable(self):
559 # Issue #30061
560 # Crash when readline() returns an object without __len__
561 class R(self.IOBase):
562 def readline(self):
563 return None
564 self.assertRaises((TypeError, StopIteration), next, R())
565
566 def test_next_nonsizeable(self):
567 # Issue #30061
568 # Crash when __next__() returns an object without __len__
569 class R(self.IOBase):
570 def __next__(self):
571 return None
572 self.assertRaises(TypeError, R().readlines, 1)
573
Guido van Rossum28524c72007-02-27 05:47:44 +0000574 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000575 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000576 self.write_ops(f)
577 data = f.getvalue()
578 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000579 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000580 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000581
Guido van Rossum53807da2007-04-10 19:01:47 +0000582 def test_large_file_ops(self):
Mike53f7a7c2017-12-14 14:04:53 +0300583 # On Windows and Mac OSX this test consumes large resources; It takes
Victor Stinner8c663fd2017-11-08 14:44:44 -0800584 # a long time to build the >2 GiB file and takes >2 GiB of disk space
Guido van Rossum34d69e52007-04-10 20:08:41 +0000585 # therefore the resource must be enabled to run this test.
Victor Stinner937ee9e2018-06-26 02:11:06 +0200586 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600587 support.requires(
588 'largefile',
589 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000590 with self.open(support.TESTFN, "w+b", 0) as f:
591 self.large_file_ops(f)
592 with self.open(support.TESTFN, "w+b") as f:
593 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000594
595 def test_with_open(self):
596 for bufsize in (0, 1, 100):
597 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000598 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000599 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000600 self.assertEqual(f.closed, True)
601 f = None
602 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000603 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000604 1/0
605 except ZeroDivisionError:
606 self.assertEqual(f.closed, True)
607 else:
608 self.fail("1/0 didn't raise an exception")
609
Antoine Pitrou08838b62009-01-21 00:55:13 +0000610 # issue 5008
611 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000612 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000613 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000614 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000615 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000616 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000617 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000618 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300619 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000620
Guido van Rossum87429772007-04-10 21:06:59 +0000621 def test_destructor(self):
622 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000623 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000624 def __del__(self):
625 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000626 try:
627 f = super().__del__
628 except AttributeError:
629 pass
630 else:
631 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000632 def close(self):
633 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000634 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000635 def flush(self):
636 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000637 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000638 with support.check_warnings(('', ResourceWarning)):
639 f = MyFileIO(support.TESTFN, "wb")
640 f.write(b"xxx")
641 del f
642 support.gc_collect()
643 self.assertEqual(record, [1, 2, 3])
644 with self.open(support.TESTFN, "rb") as f:
645 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000646
647 def _check_base_destructor(self, base):
648 record = []
649 class MyIO(base):
650 def __init__(self):
651 # This exercises the availability of attributes on object
652 # destruction.
653 # (in the C version, close() is called by the tp_dealloc
654 # function, not by __del__)
655 self.on_del = 1
656 self.on_close = 2
657 self.on_flush = 3
658 def __del__(self):
659 record.append(self.on_del)
660 try:
661 f = super().__del__
662 except AttributeError:
663 pass
664 else:
665 f()
666 def close(self):
667 record.append(self.on_close)
668 super().close()
669 def flush(self):
670 record.append(self.on_flush)
671 super().flush()
672 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000673 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000674 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000675 self.assertEqual(record, [1, 2, 3])
676
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000677 def test_IOBase_destructor(self):
678 self._check_base_destructor(self.IOBase)
679
680 def test_RawIOBase_destructor(self):
681 self._check_base_destructor(self.RawIOBase)
682
683 def test_BufferedIOBase_destructor(self):
684 self._check_base_destructor(self.BufferedIOBase)
685
686 def test_TextIOBase_destructor(self):
687 self._check_base_destructor(self.TextIOBase)
688
Guido van Rossum87429772007-04-10 21:06:59 +0000689 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000690 with self.open(support.TESTFN, "wb") as f:
691 f.write(b"xxx")
692 with self.open(support.TESTFN, "rb") as f:
693 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000694
Guido van Rossumd4103952007-04-12 05:44:49 +0000695 def test_array_writes(self):
696 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000697 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000698 def check(f):
699 with f:
700 self.assertEqual(f.write(a), n)
701 f.writelines((a,))
702 check(self.BytesIO())
703 check(self.FileIO(support.TESTFN, "w"))
704 check(self.BufferedWriter(self.MockRawIO()))
705 check(self.BufferedRandom(self.MockRawIO()))
706 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000707
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000708 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000709 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000710 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000711
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000712 def test_read_closed(self):
713 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000714 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000715 with self.open(support.TESTFN, "r") as f:
716 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000717 self.assertEqual(file.read(), "egg\n")
718 file.seek(0)
719 file.close()
720 self.assertRaises(ValueError, file.read)
721
722 def test_no_closefd_with_filename(self):
723 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000724 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000725
726 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000727 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000728 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000729 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000730 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000731 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000732 self.assertEqual(file.buffer.raw.closefd, False)
733
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000734 def test_garbage_collection(self):
735 # FileIO objects are collected, and collecting them flushes
736 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000737 with support.check_warnings(('', ResourceWarning)):
738 f = self.FileIO(support.TESTFN, "wb")
739 f.write(b"abcxxx")
740 f.f = f
741 wr = weakref.ref(f)
742 del f
743 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300744 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000745 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000746 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000747
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000748 def test_unbounded_file(self):
749 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
750 zero = "/dev/zero"
751 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000752 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000753 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000754 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000755 if support.real_max_memuse < support._2G:
Victor Stinner8c663fd2017-11-08 14:44:44 -0800756 self.skipTest("test requires at least 2 GiB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000757 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000758 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000759 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000760 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000761 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000762 self.assertRaises(OverflowError, f.read)
763
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200764 def check_flush_error_on_close(self, *args, **kwargs):
765 # Test that the file is closed despite failed flush
766 # and that flush() is called before file closed.
767 f = self.open(*args, **kwargs)
768 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000769 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200770 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200771 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000772 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200773 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600774 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200775 self.assertTrue(closed) # flush() called
776 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200777 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200778
779 def test_flush_error_on_close(self):
780 # raw file
781 # Issue #5700: io.FileIO calls flush() after file closed
782 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
783 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
784 self.check_flush_error_on_close(fd, 'wb', buffering=0)
785 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
786 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
787 os.close(fd)
788 # buffered io
789 self.check_flush_error_on_close(support.TESTFN, 'wb')
790 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
791 self.check_flush_error_on_close(fd, 'wb')
792 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
793 self.check_flush_error_on_close(fd, 'wb', closefd=False)
794 os.close(fd)
795 # text io
796 self.check_flush_error_on_close(support.TESTFN, 'w')
797 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
798 self.check_flush_error_on_close(fd, 'w')
799 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
800 self.check_flush_error_on_close(fd, 'w', closefd=False)
801 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000802
803 def test_multi_close(self):
804 f = self.open(support.TESTFN, "wb", buffering=0)
805 f.close()
806 f.close()
807 f.close()
808 self.assertRaises(ValueError, f.flush)
809
Antoine Pitrou328ec742010-09-14 18:37:24 +0000810 def test_RawIOBase_read(self):
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530811 # Exercise the default limited RawIOBase.read(n) implementation (which
812 # calls readinto() internally).
Antoine Pitrou328ec742010-09-14 18:37:24 +0000813 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
814 self.assertEqual(rawio.read(2), b"ab")
815 self.assertEqual(rawio.read(2), b"c")
816 self.assertEqual(rawio.read(2), b"d")
817 self.assertEqual(rawio.read(2), None)
818 self.assertEqual(rawio.read(2), b"ef")
819 self.assertEqual(rawio.read(2), b"g")
820 self.assertEqual(rawio.read(2), None)
821 self.assertEqual(rawio.read(2), b"")
822
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400823 def test_types_have_dict(self):
824 test = (
825 self.IOBase(),
826 self.RawIOBase(),
827 self.TextIOBase(),
828 self.StringIO(),
829 self.BytesIO()
830 )
831 for obj in test:
832 self.assertTrue(hasattr(obj, "__dict__"))
833
Ross Lagerwall59142db2011-10-31 20:34:46 +0200834 def test_opener(self):
835 with self.open(support.TESTFN, "w") as f:
836 f.write("egg\n")
837 fd = os.open(support.TESTFN, os.O_RDONLY)
838 def opener(path, flags):
839 return fd
840 with self.open("non-existent", "r", opener=opener) as f:
841 self.assertEqual(f.read(), "egg\n")
842
Barry Warsaw480e2852016-06-08 17:47:26 -0400843 def test_bad_opener_negative_1(self):
844 # Issue #27066.
845 def badopener(fname, flags):
846 return -1
847 with self.assertRaises(ValueError) as cm:
848 open('non-existent', 'r', opener=badopener)
849 self.assertEqual(str(cm.exception), 'opener returned -1')
850
851 def test_bad_opener_other_negative(self):
852 # Issue #27066.
853 def badopener(fname, flags):
854 return -2
855 with self.assertRaises(ValueError) as cm:
856 open('non-existent', 'r', opener=badopener)
857 self.assertEqual(str(cm.exception), 'opener returned -2')
858
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200859 def test_fileio_closefd(self):
860 # Issue #4841
861 with self.open(__file__, 'rb') as f1, \
862 self.open(__file__, 'rb') as f2:
863 fileio = self.FileIO(f1.fileno(), closefd=False)
864 # .__init__() must not close f1
865 fileio.__init__(f2.fileno(), closefd=False)
866 f1.readline()
867 # .close() must not close f2
868 fileio.close()
869 f2.readline()
870
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300871 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200872 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300873 with self.assertRaises(ValueError):
874 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300875
876 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200877 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300878 with self.assertRaises(ValueError):
879 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300880
Martin Panter6bb91f32016-05-28 00:41:57 +0000881 def test_buffered_readinto_mixin(self):
882 # Test the implementation provided by BufferedIOBase
883 class Stream(self.BufferedIOBase):
884 def read(self, size):
885 return b"12345"
886 read1 = read
887 stream = Stream()
888 for method in ("readinto", "readinto1"):
889 with self.subTest(method):
890 buffer = byteslike(5)
891 self.assertEqual(getattr(stream, method)(buffer), 5)
892 self.assertEqual(bytes(buffer), b"12345")
893
Ethan Furmand62548a2016-06-04 14:38:43 -0700894 def test_fspath_support(self):
Ethan Furmand62548a2016-06-04 14:38:43 -0700895 def check_path_succeeds(path):
896 with self.open(path, "w") as f:
897 f.write("egg\n")
898
899 with self.open(path, "r") as f:
900 self.assertEqual(f.read(), "egg\n")
901
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200902 check_path_succeeds(FakePath(support.TESTFN))
903 check_path_succeeds(FakePath(support.TESTFN.encode('utf-8')))
Ethan Furmand62548a2016-06-04 14:38:43 -0700904
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200905 with self.open(support.TESTFN, "w") as f:
906 bad_path = FakePath(f.fileno())
907 with self.assertRaises(TypeError):
908 self.open(bad_path, 'w')
909
910 bad_path = FakePath(None)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700911 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700912 self.open(bad_path, 'w')
913
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200914 bad_path = FakePath(FloatingPointError)
915 with self.assertRaises(FloatingPointError):
916 self.open(bad_path, 'w')
917
Ethan Furmand62548a2016-06-04 14:38:43 -0700918 # ensure that refcounting is correct with some error conditions
919 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200920 self.open(FakePath(support.TESTFN), 'rwxa')
Ethan Furmand62548a2016-06-04 14:38:43 -0700921
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530922 def test_RawIOBase_readall(self):
923 # Exercise the default unlimited RawIOBase.read() and readall()
924 # implementations.
925 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
926 self.assertEqual(rawio.read(), b"abcdefg")
927 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
928 self.assertEqual(rawio.readall(), b"abcdefg")
929
930 def test_BufferedIOBase_readinto(self):
931 # Exercise the default BufferedIOBase.readinto() and readinto1()
932 # implementations (which call read() or read1() internally).
933 class Reader(self.BufferedIOBase):
934 def __init__(self, avail):
935 self.avail = avail
936 def read(self, size):
937 result = self.avail[:size]
938 self.avail = self.avail[size:]
939 return result
940 def read1(self, size):
941 """Returns no more than 5 bytes at once"""
942 return self.read(min(size, 5))
943 tests = (
944 # (test method, total data available, read buffer size, expected
945 # read size)
946 ("readinto", 10, 5, 5),
947 ("readinto", 10, 6, 6), # More than read1() can return
948 ("readinto", 5, 6, 5), # Buffer larger than total available
949 ("readinto", 6, 7, 6),
950 ("readinto", 10, 0, 0), # Empty buffer
951 ("readinto1", 10, 5, 5), # Result limited to single read1() call
952 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return
953 ("readinto1", 5, 6, 5), # Buffer larger than total available
954 ("readinto1", 6, 7, 5),
955 ("readinto1", 10, 0, 0), # Empty buffer
956 )
957 UNUSED_BYTE = 0x81
958 for test in tests:
959 with self.subTest(test):
960 method, avail, request, result = test
961 reader = Reader(bytes(range(avail)))
962 buffer = bytearray((UNUSED_BYTE,) * request)
963 method = getattr(reader, method)
964 self.assertEqual(method(buffer), result)
965 self.assertEqual(len(buffer), request)
966 self.assertSequenceEqual(buffer[:result], range(result))
967 unused = (UNUSED_BYTE,) * (request - result)
968 self.assertSequenceEqual(buffer[result:], unused)
969 self.assertEqual(len(reader.avail), avail - result)
970
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200971
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000972class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200973
974 def test_IOBase_finalize(self):
975 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
976 # class which inherits IOBase and an object of this class are caught
977 # in a reference cycle and close() is already in the method cache.
978 class MyIO(self.IOBase):
979 def close(self):
980 pass
981
982 # create an instance to populate the method cache
983 MyIO()
984 obj = MyIO()
985 obj.obj = obj
986 wr = weakref.ref(obj)
987 del MyIO
988 del obj
989 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300990 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000991
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000992class PyIOTest(IOTest):
993 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000994
Guido van Rossuma9e20242007-03-08 00:43:48 +0000995
Gregory P. Smith1bef9072015-04-14 13:24:34 -0700996@support.cpython_only
997class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -0700998
Gregory P. Smith054b0652015-04-14 12:58:05 -0700999 def test_RawIOBase_io_in_pyio_match(self):
1000 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +02001001 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
1002 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -07001003 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1004
1005 def test_RawIOBase_pyio_in_io_match(self):
1006 """Test that c RawIOBase class has all pyio RawIOBase methods"""
1007 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1008 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1009
1010
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001011class CommonBufferedTests:
1012 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1013
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001014 def test_detach(self):
1015 raw = self.MockRawIO()
1016 buf = self.tp(raw)
1017 self.assertIs(buf.detach(), raw)
1018 self.assertRaises(ValueError, buf.detach)
1019
Benjamin Peterson10e76b62014-12-21 20:51:50 -06001020 repr(buf) # Should still work
1021
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001022 def test_fileno(self):
1023 rawio = self.MockRawIO()
1024 bufio = self.tp(rawio)
1025
Ezio Melottib3aedd42010-11-20 19:04:17 +00001026 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001027
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001028 def test_invalid_args(self):
1029 rawio = self.MockRawIO()
1030 bufio = self.tp(rawio)
1031 # Invalid whence
1032 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +02001033 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001034
1035 def test_override_destructor(self):
1036 tp = self.tp
1037 record = []
1038 class MyBufferedIO(tp):
1039 def __del__(self):
1040 record.append(1)
1041 try:
1042 f = super().__del__
1043 except AttributeError:
1044 pass
1045 else:
1046 f()
1047 def close(self):
1048 record.append(2)
1049 super().close()
1050 def flush(self):
1051 record.append(3)
1052 super().flush()
1053 rawio = self.MockRawIO()
1054 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001055 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001056 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +00001057 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001058
1059 def test_context_manager(self):
1060 # Test usability as a context manager
1061 rawio = self.MockRawIO()
1062 bufio = self.tp(rawio)
1063 def _with():
1064 with bufio:
1065 pass
1066 _with()
1067 # bufio should now be closed, and using it a second time should raise
1068 # a ValueError.
1069 self.assertRaises(ValueError, _with)
1070
1071 def test_error_through_destructor(self):
1072 # Test that the exception state is not modified by a destructor,
1073 # even if close() fails.
1074 rawio = self.CloseFailureIO()
1075 def f():
1076 self.tp(rawio).xyzzy
1077 with support.captured_output("stderr") as s:
1078 self.assertRaises(AttributeError, f)
1079 s = s.getvalue().strip()
1080 if s:
1081 # The destructor *may* have printed an unraisable error, check it
1082 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001083 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001084 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +00001085
Antoine Pitrou716c4442009-05-23 19:04:03 +00001086 def test_repr(self):
1087 raw = self.MockRawIO()
1088 b = self.tp(raw)
Serhiy Storchaka521e5862014-07-22 15:00:37 +03001089 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001090 self.assertEqual(repr(b), "<%s>" % clsname)
1091 raw.name = "dummy"
1092 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
1093 raw.name = b"dummy"
1094 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
1095
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02001096 def test_recursive_repr(self):
1097 # Issue #25455
1098 raw = self.MockRawIO()
1099 b = self.tp(raw)
1100 with support.swap_attr(raw, 'name', b):
1101 try:
1102 repr(b) # Should not crash
1103 except RuntimeError:
1104 pass
1105
Antoine Pitrou6be88762010-05-03 16:48:20 +00001106 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001107 # Test that buffered file is closed despite failed flush
1108 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001109 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001110 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001111 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001112 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001113 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001114 raw.flush = bad_flush
1115 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001116 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001117 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001118 self.assertTrue(raw.closed)
1119 self.assertTrue(closed) # flush() called
1120 self.assertFalse(closed[0]) # flush() called before file closed
1121 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001122 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001123
1124 def test_close_error_on_close(self):
1125 raw = self.MockRawIO()
1126 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001127 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001128 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001129 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001130 raw.close = bad_close
1131 b = self.tp(raw)
1132 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001133 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001134 b.close()
1135 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001136 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001137 self.assertEqual(err.exception.__context__.args, ('flush',))
1138 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001139
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001140 def test_nonnormalized_close_error_on_close(self):
1141 # Issue #21677
1142 raw = self.MockRawIO()
1143 def bad_flush():
1144 raise non_existing_flush
1145 def bad_close():
1146 raise non_existing_close
1147 raw.close = bad_close
1148 b = self.tp(raw)
1149 b.flush = bad_flush
1150 with self.assertRaises(NameError) as err: # exception not swallowed
1151 b.close()
1152 self.assertIn('non_existing_close', str(err.exception))
1153 self.assertIsInstance(err.exception.__context__, NameError)
1154 self.assertIn('non_existing_flush', str(err.exception.__context__))
1155 self.assertFalse(b.closed)
1156
Antoine Pitrou6be88762010-05-03 16:48:20 +00001157 def test_multi_close(self):
1158 raw = self.MockRawIO()
1159 b = self.tp(raw)
1160 b.close()
1161 b.close()
1162 b.close()
1163 self.assertRaises(ValueError, b.flush)
1164
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001165 def test_unseekable(self):
1166 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1167 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1168 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1169
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001170 def test_readonly_attributes(self):
1171 raw = self.MockRawIO()
1172 buf = self.tp(raw)
1173 x = self.MockRawIO()
1174 with self.assertRaises(AttributeError):
1175 buf.raw = x
1176
Guido van Rossum78892e42007-04-06 17:31:18 +00001177
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001178class SizeofTest:
1179
1180 @support.cpython_only
1181 def test_sizeof(self):
1182 bufsize1 = 4096
1183 bufsize2 = 8192
1184 rawio = self.MockRawIO()
1185 bufio = self.tp(rawio, buffer_size=bufsize1)
1186 size = sys.getsizeof(bufio) - bufsize1
1187 rawio = self.MockRawIO()
1188 bufio = self.tp(rawio, buffer_size=bufsize2)
1189 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1190
Jesus Ceadc469452012-10-04 12:37:56 +02001191 @support.cpython_only
1192 def test_buffer_freeing(self) :
1193 bufsize = 4096
1194 rawio = self.MockRawIO()
1195 bufio = self.tp(rawio, buffer_size=bufsize)
1196 size = sys.getsizeof(bufio) - bufsize
1197 bufio.close()
1198 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001200class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1201 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001202
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001203 def test_constructor(self):
1204 rawio = self.MockRawIO([b"abc"])
1205 bufio = self.tp(rawio)
1206 bufio.__init__(rawio)
1207 bufio.__init__(rawio, buffer_size=1024)
1208 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001209 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001210 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1211 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1212 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1213 rawio = self.MockRawIO([b"abc"])
1214 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001215 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001216
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001217 def test_uninitialized(self):
1218 bufio = self.tp.__new__(self.tp)
1219 del bufio
1220 bufio = self.tp.__new__(self.tp)
1221 self.assertRaisesRegex((ValueError, AttributeError),
1222 'uninitialized|has no attribute',
1223 bufio.read, 0)
1224 bufio.__init__(self.MockRawIO())
1225 self.assertEqual(bufio.read(0), b'')
1226
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001227 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001228 for arg in (None, 7):
1229 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1230 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001231 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001232 # Invalid args
1233 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001234
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001235 def test_read1(self):
1236 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1237 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001238 self.assertEqual(b"a", bufio.read(1))
1239 self.assertEqual(b"b", bufio.read1(1))
1240 self.assertEqual(rawio._reads, 1)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001241 self.assertEqual(b"", bufio.read1(0))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001242 self.assertEqual(b"c", bufio.read1(100))
1243 self.assertEqual(rawio._reads, 1)
1244 self.assertEqual(b"d", bufio.read1(100))
1245 self.assertEqual(rawio._reads, 2)
1246 self.assertEqual(b"efg", bufio.read1(100))
1247 self.assertEqual(rawio._reads, 3)
1248 self.assertEqual(b"", bufio.read1(100))
1249 self.assertEqual(rawio._reads, 4)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001250
1251 def test_read1_arbitrary(self):
1252 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1253 bufio = self.tp(rawio)
1254 self.assertEqual(b"a", bufio.read(1))
1255 self.assertEqual(b"bc", bufio.read1())
1256 self.assertEqual(b"d", bufio.read1())
1257 self.assertEqual(b"efg", bufio.read1(-1))
1258 self.assertEqual(rawio._reads, 3)
1259 self.assertEqual(b"", bufio.read1())
1260 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001261
1262 def test_readinto(self):
1263 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1264 bufio = self.tp(rawio)
1265 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001266 self.assertEqual(bufio.readinto(b), 2)
1267 self.assertEqual(b, b"ab")
1268 self.assertEqual(bufio.readinto(b), 2)
1269 self.assertEqual(b, b"cd")
1270 self.assertEqual(bufio.readinto(b), 2)
1271 self.assertEqual(b, b"ef")
1272 self.assertEqual(bufio.readinto(b), 1)
1273 self.assertEqual(b, b"gf")
1274 self.assertEqual(bufio.readinto(b), 0)
1275 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001276 rawio = self.MockRawIO((b"abc", None))
1277 bufio = self.tp(rawio)
1278 self.assertEqual(bufio.readinto(b), 2)
1279 self.assertEqual(b, b"ab")
1280 self.assertEqual(bufio.readinto(b), 1)
1281 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001282
Benjamin Petersona96fea02014-06-22 14:17:44 -07001283 def test_readinto1(self):
1284 buffer_size = 10
1285 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1286 bufio = self.tp(rawio, buffer_size=buffer_size)
1287 b = bytearray(2)
1288 self.assertEqual(bufio.peek(3), b'abc')
1289 self.assertEqual(rawio._reads, 1)
1290 self.assertEqual(bufio.readinto1(b), 2)
1291 self.assertEqual(b, b"ab")
1292 self.assertEqual(rawio._reads, 1)
1293 self.assertEqual(bufio.readinto1(b), 1)
1294 self.assertEqual(b[:1], b"c")
1295 self.assertEqual(rawio._reads, 1)
1296 self.assertEqual(bufio.readinto1(b), 2)
1297 self.assertEqual(b, b"de")
1298 self.assertEqual(rawio._reads, 2)
1299 b = bytearray(2*buffer_size)
1300 self.assertEqual(bufio.peek(3), b'fgh')
1301 self.assertEqual(rawio._reads, 3)
1302 self.assertEqual(bufio.readinto1(b), 6)
1303 self.assertEqual(b[:6], b"fghjkl")
1304 self.assertEqual(rawio._reads, 4)
1305
1306 def test_readinto_array(self):
1307 buffer_size = 60
1308 data = b"a" * 26
1309 rawio = self.MockRawIO((data,))
1310 bufio = self.tp(rawio, buffer_size=buffer_size)
1311
1312 # Create an array with element size > 1 byte
1313 b = array.array('i', b'x' * 32)
1314 assert len(b) != 16
1315
1316 # Read into it. We should get as many *bytes* as we can fit into b
1317 # (which is more than the number of elements)
1318 n = bufio.readinto(b)
1319 self.assertGreater(n, len(b))
1320
1321 # Check that old contents of b are preserved
1322 bm = memoryview(b).cast('B')
1323 self.assertLess(n, len(bm))
1324 self.assertEqual(bm[:n], data[:n])
1325 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1326
1327 def test_readinto1_array(self):
1328 buffer_size = 60
1329 data = b"a" * 26
1330 rawio = self.MockRawIO((data,))
1331 bufio = self.tp(rawio, buffer_size=buffer_size)
1332
1333 # Create an array with element size > 1 byte
1334 b = array.array('i', b'x' * 32)
1335 assert len(b) != 16
1336
1337 # Read into it. We should get as many *bytes* as we can fit into b
1338 # (which is more than the number of elements)
1339 n = bufio.readinto1(b)
1340 self.assertGreater(n, len(b))
1341
1342 # Check that old contents of b are preserved
1343 bm = memoryview(b).cast('B')
1344 self.assertLess(n, len(bm))
1345 self.assertEqual(bm[:n], data[:n])
1346 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1347
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001348 def test_readlines(self):
1349 def bufio():
1350 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1351 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001352 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1353 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1354 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001355
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001356 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001357 data = b"abcdefghi"
1358 dlen = len(data)
1359
1360 tests = [
1361 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1362 [ 100, [ 3, 3, 3], [ dlen ] ],
1363 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1364 ]
1365
1366 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001367 rawio = self.MockFileIO(data)
1368 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001369 pos = 0
1370 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001371 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001372 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001373 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001374 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001375
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001376 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001377 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001378 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1379 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001380 self.assertEqual(b"abcd", bufio.read(6))
1381 self.assertEqual(b"e", bufio.read(1))
1382 self.assertEqual(b"fg", bufio.read())
1383 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001384 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001385 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001386
Victor Stinnera80987f2011-05-25 22:47:16 +02001387 rawio = self.MockRawIO((b"a", None, None))
1388 self.assertEqual(b"a", rawio.readall())
1389 self.assertIsNone(rawio.readall())
1390
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001391 def test_read_past_eof(self):
1392 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1393 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001394
Ezio Melottib3aedd42010-11-20 19:04:17 +00001395 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001396
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001397 def test_read_all(self):
1398 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1399 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001400
Ezio Melottib3aedd42010-11-20 19:04:17 +00001401 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001402
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001403 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001404 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001405 try:
1406 # Write out many bytes with exactly the same number of 0's,
1407 # 1's... 255's. This will help us check that concurrent reading
1408 # doesn't duplicate or forget contents.
1409 N = 1000
1410 l = list(range(256)) * N
1411 random.shuffle(l)
1412 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001413 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001414 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001415 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001416 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001417 errors = []
1418 results = []
1419 def f():
1420 try:
1421 # Intra-buffer read then buffer-flushing read
1422 for n in cycle([1, 19]):
1423 s = bufio.read(n)
1424 if not s:
1425 break
1426 # list.append() is atomic
1427 results.append(s)
1428 except Exception as e:
1429 errors.append(e)
1430 raise
1431 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001432 with support.start_threads(threads):
1433 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001434 self.assertFalse(errors,
1435 "the following exceptions were caught: %r" % errors)
1436 s = b''.join(results)
1437 for i in range(256):
1438 c = bytes(bytearray([i]))
1439 self.assertEqual(s.count(c), N)
1440 finally:
1441 support.unlink(support.TESTFN)
1442
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001443 def test_unseekable(self):
1444 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1445 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1446 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1447 bufio.read(1)
1448 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1449 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1450
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001451 def test_misbehaved_io(self):
1452 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1453 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001454 self.assertRaises(OSError, bufio.seek, 0)
1455 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001456
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001457 def test_no_extraneous_read(self):
1458 # Issue #9550; when the raw IO object has satisfied the read request,
1459 # we should not issue any additional reads, otherwise it may block
1460 # (e.g. socket).
1461 bufsize = 16
1462 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1463 rawio = self.MockRawIO([b"x" * n])
1464 bufio = self.tp(rawio, bufsize)
1465 self.assertEqual(bufio.read(n), b"x" * n)
1466 # Simple case: one raw read is enough to satisfy the request.
1467 self.assertEqual(rawio._extraneous_reads, 0,
1468 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1469 # A more complex case where two raw reads are needed to satisfy
1470 # the request.
1471 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1472 bufio = self.tp(rawio, bufsize)
1473 self.assertEqual(bufio.read(n), b"x" * n)
1474 self.assertEqual(rawio._extraneous_reads, 0,
1475 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1476
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001477 def test_read_on_closed(self):
1478 # Issue #23796
1479 b = io.BufferedReader(io.BytesIO(b"12"))
1480 b.read(1)
1481 b.close()
1482 self.assertRaises(ValueError, b.peek)
1483 self.assertRaises(ValueError, b.read1, 1)
1484
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001485
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001486class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001487 tp = io.BufferedReader
1488
1489 def test_constructor(self):
1490 BufferedReaderTest.test_constructor(self)
1491 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001492 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001493 if sys.maxsize > 0x7FFFFFFF:
1494 rawio = self.MockRawIO()
1495 bufio = self.tp(rawio)
1496 self.assertRaises((OverflowError, MemoryError, ValueError),
1497 bufio.__init__, rawio, sys.maxsize)
1498
1499 def test_initialization(self):
1500 rawio = self.MockRawIO([b"abc"])
1501 bufio = self.tp(rawio)
1502 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1503 self.assertRaises(ValueError, bufio.read)
1504 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1505 self.assertRaises(ValueError, bufio.read)
1506 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1507 self.assertRaises(ValueError, bufio.read)
1508
1509 def test_misbehaved_io_read(self):
1510 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1511 bufio = self.tp(rawio)
1512 # _pyio.BufferedReader seems to implement reading different, so that
1513 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001514 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001515
1516 def test_garbage_collection(self):
1517 # C BufferedReader objects are collected.
1518 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001519 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou796564c2013-07-30 19:59:21 +02001520 with support.check_warnings(('', ResourceWarning)):
1521 rawio = self.FileIO(support.TESTFN, "w+b")
1522 f = self.tp(rawio)
1523 f.f = f
1524 wr = weakref.ref(f)
1525 del f
1526 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001527 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001528
R David Murray67bfe802013-02-23 21:51:05 -05001529 def test_args_error(self):
1530 # Issue #17275
1531 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1532 self.tp(io.BytesIO(), 1024, 1024, 1024)
1533
1534
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001535class PyBufferedReaderTest(BufferedReaderTest):
1536 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001537
Guido van Rossuma9e20242007-03-08 00:43:48 +00001538
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001539class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1540 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001541
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001542 def test_constructor(self):
1543 rawio = self.MockRawIO()
1544 bufio = self.tp(rawio)
1545 bufio.__init__(rawio)
1546 bufio.__init__(rawio, buffer_size=1024)
1547 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001548 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001549 bufio.flush()
1550 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1551 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1552 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1553 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001554 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001555 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001556 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001557
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001558 def test_uninitialized(self):
1559 bufio = self.tp.__new__(self.tp)
1560 del bufio
1561 bufio = self.tp.__new__(self.tp)
1562 self.assertRaisesRegex((ValueError, AttributeError),
1563 'uninitialized|has no attribute',
1564 bufio.write, b'')
1565 bufio.__init__(self.MockRawIO())
1566 self.assertEqual(bufio.write(b''), 0)
1567
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001568 def test_detach_flush(self):
1569 raw = self.MockRawIO()
1570 buf = self.tp(raw)
1571 buf.write(b"howdy!")
1572 self.assertFalse(raw._write_stack)
1573 buf.detach()
1574 self.assertEqual(raw._write_stack, [b"howdy!"])
1575
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001576 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001577 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001578 writer = self.MockRawIO()
1579 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001580 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001581 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001582 buffer = bytearray(b"def")
1583 bufio.write(buffer)
1584 buffer[:] = b"***" # Overwrite our copy of the data
1585 bufio.flush()
1586 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001587
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001588 def test_write_overflow(self):
1589 writer = self.MockRawIO()
1590 bufio = self.tp(writer, 8)
1591 contents = b"abcdefghijklmnop"
1592 for n in range(0, len(contents), 3):
1593 bufio.write(contents[n:n+3])
1594 flushed = b"".join(writer._write_stack)
1595 # At least (total - 8) bytes were implicitly flushed, perhaps more
1596 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001597 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001598
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001599 def check_writes(self, intermediate_func):
1600 # Lots of writes, test the flushed output is as expected.
1601 contents = bytes(range(256)) * 1000
1602 n = 0
1603 writer = self.MockRawIO()
1604 bufio = self.tp(writer, 13)
1605 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1606 def gen_sizes():
1607 for size in count(1):
1608 for i in range(15):
1609 yield size
1610 sizes = gen_sizes()
1611 while n < len(contents):
1612 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001613 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001614 intermediate_func(bufio)
1615 n += size
1616 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001617 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001618
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001619 def test_writes(self):
1620 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001621
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001622 def test_writes_and_flushes(self):
1623 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001624
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001625 def test_writes_and_seeks(self):
1626 def _seekabs(bufio):
1627 pos = bufio.tell()
1628 bufio.seek(pos + 1, 0)
1629 bufio.seek(pos - 1, 0)
1630 bufio.seek(pos, 0)
1631 self.check_writes(_seekabs)
1632 def _seekrel(bufio):
1633 pos = bufio.seek(0, 1)
1634 bufio.seek(+1, 1)
1635 bufio.seek(-1, 1)
1636 bufio.seek(pos, 0)
1637 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001638
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001639 def test_writes_and_truncates(self):
1640 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001641
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001642 def test_write_non_blocking(self):
1643 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001644 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001645
Ezio Melottib3aedd42010-11-20 19:04:17 +00001646 self.assertEqual(bufio.write(b"abcd"), 4)
1647 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001648 # 1 byte will be written, the rest will be buffered
1649 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001650 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001651
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001652 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1653 raw.block_on(b"0")
1654 try:
1655 bufio.write(b"opqrwxyz0123456789")
1656 except self.BlockingIOError as e:
1657 written = e.characters_written
1658 else:
1659 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001660 self.assertEqual(written, 16)
1661 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001662 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001663
Ezio Melottib3aedd42010-11-20 19:04:17 +00001664 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001665 s = raw.pop_written()
1666 # Previously buffered bytes were flushed
1667 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001668
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001669 def test_write_and_rewind(self):
1670 raw = io.BytesIO()
1671 bufio = self.tp(raw, 4)
1672 self.assertEqual(bufio.write(b"abcdef"), 6)
1673 self.assertEqual(bufio.tell(), 6)
1674 bufio.seek(0, 0)
1675 self.assertEqual(bufio.write(b"XY"), 2)
1676 bufio.seek(6, 0)
1677 self.assertEqual(raw.getvalue(), b"XYcdef")
1678 self.assertEqual(bufio.write(b"123456"), 6)
1679 bufio.flush()
1680 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001681
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001682 def test_flush(self):
1683 writer = self.MockRawIO()
1684 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001685 bufio.write(b"abc")
1686 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001687 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001688
Antoine Pitrou131a4892012-10-16 22:57:11 +02001689 def test_writelines(self):
1690 l = [b'ab', b'cd', b'ef']
1691 writer = self.MockRawIO()
1692 bufio = self.tp(writer, 8)
1693 bufio.writelines(l)
1694 bufio.flush()
1695 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1696
1697 def test_writelines_userlist(self):
1698 l = UserList([b'ab', b'cd', b'ef'])
1699 writer = self.MockRawIO()
1700 bufio = self.tp(writer, 8)
1701 bufio.writelines(l)
1702 bufio.flush()
1703 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1704
1705 def test_writelines_error(self):
1706 writer = self.MockRawIO()
1707 bufio = self.tp(writer, 8)
1708 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1709 self.assertRaises(TypeError, bufio.writelines, None)
1710 self.assertRaises(TypeError, bufio.writelines, 'abc')
1711
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001712 def test_destructor(self):
1713 writer = self.MockRawIO()
1714 bufio = self.tp(writer, 8)
1715 bufio.write(b"abc")
1716 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001717 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001718 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001719
1720 def test_truncate(self):
1721 # Truncate implicitly flushes the buffer.
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001722 self.addCleanup(support.unlink, support.TESTFN)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001723 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001724 bufio = self.tp(raw, 8)
1725 bufio.write(b"abcdef")
1726 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001727 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001728 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001729 self.assertEqual(f.read(), b"abc")
1730
Nitish Chandra059f58c2018-01-28 21:30:09 +05301731 def test_truncate_after_write(self):
1732 # Ensure that truncate preserves the file position after
1733 # writes longer than the buffer size.
1734 # Issue: https://bugs.python.org/issue32228
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001735 self.addCleanup(support.unlink, support.TESTFN)
Nitish Chandra059f58c2018-01-28 21:30:09 +05301736 with self.open(support.TESTFN, "wb") as f:
1737 # Fill with some buffer
1738 f.write(b'\x00' * 10000)
1739 buffer_sizes = [8192, 4096, 200]
1740 for buffer_size in buffer_sizes:
1741 with self.open(support.TESTFN, "r+b", buffering=buffer_size) as f:
1742 f.write(b'\x00' * (buffer_size + 1))
1743 # After write write_pos and write_end are set to 0
1744 f.read(1)
1745 # read operation makes sure that pos != raw_pos
1746 f.truncate()
1747 self.assertEqual(f.tell(), buffer_size + 2)
1748
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001749 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001750 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001751 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001752 # Write out many bytes from many threads and test they were
1753 # all flushed.
1754 N = 1000
1755 contents = bytes(range(256)) * N
1756 sizes = cycle([1, 19])
1757 n = 0
1758 queue = deque()
1759 while n < len(contents):
1760 size = next(sizes)
1761 queue.append(contents[n:n+size])
1762 n += size
1763 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001764 # We use a real file object because it allows us to
1765 # exercise situations where the GIL is released before
1766 # writing the buffer to the raw streams. This is in addition
1767 # to concurrency issues due to switching threads in the middle
1768 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001769 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001770 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001771 errors = []
1772 def f():
1773 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001774 while True:
1775 try:
1776 s = queue.popleft()
1777 except IndexError:
1778 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001779 bufio.write(s)
1780 except Exception as e:
1781 errors.append(e)
1782 raise
1783 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001784 with support.start_threads(threads):
1785 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001786 self.assertFalse(errors,
1787 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001788 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001789 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001790 s = f.read()
1791 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001792 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001793 finally:
1794 support.unlink(support.TESTFN)
1795
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001796 def test_misbehaved_io(self):
1797 rawio = self.MisbehavedRawIO()
1798 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001799 self.assertRaises(OSError, bufio.seek, 0)
1800 self.assertRaises(OSError, bufio.tell)
1801 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001802
Florent Xicluna109d5732012-07-07 17:03:22 +02001803 def test_max_buffer_size_removal(self):
1804 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001805 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001806
Benjamin Peterson68623612012-12-20 11:53:11 -06001807 def test_write_error_on_close(self):
1808 raw = self.MockRawIO()
1809 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001810 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001811 raw.write = bad_write
1812 b = self.tp(raw)
1813 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001814 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001815 self.assertTrue(b.closed)
1816
benfogle9703f092017-11-10 16:03:40 -05001817 def test_slow_close_from_thread(self):
1818 # Issue #31976
1819 rawio = self.SlowFlushRawIO()
1820 bufio = self.tp(rawio, 8)
1821 t = threading.Thread(target=bufio.close)
1822 t.start()
1823 rawio.in_flush.wait()
1824 self.assertRaises(ValueError, bufio.write, b'spam')
1825 self.assertTrue(bufio.closed)
1826 t.join()
1827
1828
Benjamin Peterson59406a92009-03-26 17:10:29 +00001829
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001830class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001831 tp = io.BufferedWriter
1832
1833 def test_constructor(self):
1834 BufferedWriterTest.test_constructor(self)
1835 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001836 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001837 if sys.maxsize > 0x7FFFFFFF:
1838 rawio = self.MockRawIO()
1839 bufio = self.tp(rawio)
1840 self.assertRaises((OverflowError, MemoryError, ValueError),
1841 bufio.__init__, rawio, sys.maxsize)
1842
1843 def test_initialization(self):
1844 rawio = self.MockRawIO()
1845 bufio = self.tp(rawio)
1846 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1847 self.assertRaises(ValueError, bufio.write, b"def")
1848 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1849 self.assertRaises(ValueError, bufio.write, b"def")
1850 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1851 self.assertRaises(ValueError, bufio.write, b"def")
1852
1853 def test_garbage_collection(self):
1854 # C BufferedWriter objects are collected, and collecting them flushes
1855 # all data to disk.
1856 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001857 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou796564c2013-07-30 19:59:21 +02001858 with support.check_warnings(('', ResourceWarning)):
1859 rawio = self.FileIO(support.TESTFN, "w+b")
1860 f = self.tp(rawio)
1861 f.write(b"123xxx")
1862 f.x = f
1863 wr = weakref.ref(f)
1864 del f
1865 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001866 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001867 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001868 self.assertEqual(f.read(), b"123xxx")
1869
R David Murray67bfe802013-02-23 21:51:05 -05001870 def test_args_error(self):
1871 # Issue #17275
1872 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1873 self.tp(io.BytesIO(), 1024, 1024, 1024)
1874
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001875
1876class PyBufferedWriterTest(BufferedWriterTest):
1877 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001878
Guido van Rossum01a27522007-03-07 01:00:12 +00001879class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001880
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001881 def test_constructor(self):
1882 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001883 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001884
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001885 def test_uninitialized(self):
1886 pair = self.tp.__new__(self.tp)
1887 del pair
1888 pair = self.tp.__new__(self.tp)
1889 self.assertRaisesRegex((ValueError, AttributeError),
1890 'uninitialized|has no attribute',
1891 pair.read, 0)
1892 self.assertRaisesRegex((ValueError, AttributeError),
1893 'uninitialized|has no attribute',
1894 pair.write, b'')
1895 pair.__init__(self.MockRawIO(), self.MockRawIO())
1896 self.assertEqual(pair.read(0), b'')
1897 self.assertEqual(pair.write(b''), 0)
1898
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001899 def test_detach(self):
1900 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1901 self.assertRaises(self.UnsupportedOperation, pair.detach)
1902
Florent Xicluna109d5732012-07-07 17:03:22 +02001903 def test_constructor_max_buffer_size_removal(self):
1904 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001905 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001906
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001907 def test_constructor_with_not_readable(self):
1908 class NotReadable(MockRawIO):
1909 def readable(self):
1910 return False
1911
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001912 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001913
1914 def test_constructor_with_not_writeable(self):
1915 class NotWriteable(MockRawIO):
1916 def writable(self):
1917 return False
1918
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001919 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001920
1921 def test_read(self):
1922 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1923
1924 self.assertEqual(pair.read(3), b"abc")
1925 self.assertEqual(pair.read(1), b"d")
1926 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001927 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1928 self.assertEqual(pair.read(None), b"abc")
1929
1930 def test_readlines(self):
1931 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1932 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1933 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1934 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001935
1936 def test_read1(self):
1937 # .read1() is delegated to the underlying reader object, so this test
1938 # can be shallow.
1939 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1940
1941 self.assertEqual(pair.read1(3), b"abc")
Martin Panterccb2c0e2016-10-20 23:48:14 +00001942 self.assertEqual(pair.read1(), b"def")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001943
1944 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00001945 for method in ("readinto", "readinto1"):
1946 with self.subTest(method):
1947 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001948
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03001949 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00001950 self.assertEqual(getattr(pair, method)(data), 5)
1951 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001952
1953 def test_write(self):
1954 w = self.MockRawIO()
1955 pair = self.tp(self.MockRawIO(), w)
1956
1957 pair.write(b"abc")
1958 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00001959 buffer = bytearray(b"def")
1960 pair.write(buffer)
1961 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001962 pair.flush()
1963 self.assertEqual(w._write_stack, [b"abc", b"def"])
1964
1965 def test_peek(self):
1966 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1967
1968 self.assertTrue(pair.peek(3).startswith(b"abc"))
1969 self.assertEqual(pair.read(3), b"abc")
1970
1971 def test_readable(self):
1972 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1973 self.assertTrue(pair.readable())
1974
1975 def test_writeable(self):
1976 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1977 self.assertTrue(pair.writable())
1978
1979 def test_seekable(self):
1980 # BufferedRWPairs are never seekable, even if their readers and writers
1981 # are.
1982 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1983 self.assertFalse(pair.seekable())
1984
1985 # .flush() is delegated to the underlying writer object and has been
1986 # tested in the test_write method.
1987
1988 def test_close_and_closed(self):
1989 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1990 self.assertFalse(pair.closed)
1991 pair.close()
1992 self.assertTrue(pair.closed)
1993
Serhiy Storchaka7665be62015-03-24 23:21:57 +02001994 def test_reader_close_error_on_close(self):
1995 def reader_close():
1996 reader_non_existing
1997 reader = self.MockRawIO()
1998 reader.close = reader_close
1999 writer = self.MockRawIO()
2000 pair = self.tp(reader, writer)
2001 with self.assertRaises(NameError) as err:
2002 pair.close()
2003 self.assertIn('reader_non_existing', str(err.exception))
2004 self.assertTrue(pair.closed)
2005 self.assertFalse(reader.closed)
2006 self.assertTrue(writer.closed)
2007
2008 def test_writer_close_error_on_close(self):
2009 def writer_close():
2010 writer_non_existing
2011 reader = self.MockRawIO()
2012 writer = self.MockRawIO()
2013 writer.close = writer_close
2014 pair = self.tp(reader, writer)
2015 with self.assertRaises(NameError) as err:
2016 pair.close()
2017 self.assertIn('writer_non_existing', str(err.exception))
2018 self.assertFalse(pair.closed)
2019 self.assertTrue(reader.closed)
2020 self.assertFalse(writer.closed)
2021
2022 def test_reader_writer_close_error_on_close(self):
2023 def reader_close():
2024 reader_non_existing
2025 def writer_close():
2026 writer_non_existing
2027 reader = self.MockRawIO()
2028 reader.close = reader_close
2029 writer = self.MockRawIO()
2030 writer.close = writer_close
2031 pair = self.tp(reader, writer)
2032 with self.assertRaises(NameError) as err:
2033 pair.close()
2034 self.assertIn('reader_non_existing', str(err.exception))
2035 self.assertIsInstance(err.exception.__context__, NameError)
2036 self.assertIn('writer_non_existing', str(err.exception.__context__))
2037 self.assertFalse(pair.closed)
2038 self.assertFalse(reader.closed)
2039 self.assertFalse(writer.closed)
2040
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002041 def test_isatty(self):
2042 class SelectableIsAtty(MockRawIO):
2043 def __init__(self, isatty):
2044 MockRawIO.__init__(self)
2045 self._isatty = isatty
2046
2047 def isatty(self):
2048 return self._isatty
2049
2050 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2051 self.assertFalse(pair.isatty())
2052
2053 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2054 self.assertTrue(pair.isatty())
2055
2056 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2057 self.assertTrue(pair.isatty())
2058
2059 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2060 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00002061
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04002062 def test_weakref_clearing(self):
2063 brw = self.tp(self.MockRawIO(), self.MockRawIO())
2064 ref = weakref.ref(brw)
2065 brw = None
2066 ref = None # Shouldn't segfault.
2067
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002068class CBufferedRWPairTest(BufferedRWPairTest):
2069 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002070
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002071class PyBufferedRWPairTest(BufferedRWPairTest):
2072 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002073
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002074
2075class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2076 read_mode = "rb+"
2077 write_mode = "wb+"
2078
2079 def test_constructor(self):
2080 BufferedReaderTest.test_constructor(self)
2081 BufferedWriterTest.test_constructor(self)
2082
Serhiy Storchaka61e24932014-02-12 10:52:35 +02002083 def test_uninitialized(self):
2084 BufferedReaderTest.test_uninitialized(self)
2085 BufferedWriterTest.test_uninitialized(self)
2086
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002087 def test_read_and_write(self):
2088 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00002089 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00002090
2091 self.assertEqual(b"as", rw.read(2))
2092 rw.write(b"ddd")
2093 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002094 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002095 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002096 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00002097
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002098 def test_seek_and_tell(self):
2099 raw = self.BytesIO(b"asdfghjkl")
2100 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00002101
Ezio Melottib3aedd42010-11-20 19:04:17 +00002102 self.assertEqual(b"as", rw.read(2))
2103 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002104 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002105 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00002106
Antoine Pitroue05565e2011-08-20 14:39:23 +02002107 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00002108 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02002109 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002110 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002111 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002112 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002113 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002114 self.assertEqual(7, rw.tell())
2115 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02002116 rw.flush()
2117 self.assertEqual(b"asdf123fl", raw.getvalue())
2118
Christian Heimes8e42a0a2007-11-08 18:04:45 +00002119 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002120
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002121 def check_flush_and_read(self, read_func):
2122 raw = self.BytesIO(b"abcdefghi")
2123 bufio = self.tp(raw)
2124
Ezio Melottib3aedd42010-11-20 19:04:17 +00002125 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002126 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002127 self.assertEqual(b"ef", read_func(bufio, 2))
2128 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002129 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002130 self.assertEqual(6, bufio.tell())
2131 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002132 raw.seek(0, 0)
2133 raw.write(b"XYZ")
2134 # flush() resets the read buffer
2135 bufio.flush()
2136 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002137 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002138
2139 def test_flush_and_read(self):
2140 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2141
2142 def test_flush_and_readinto(self):
2143 def _readinto(bufio, n=-1):
2144 b = bytearray(n if n >= 0 else 9999)
2145 n = bufio.readinto(b)
2146 return bytes(b[:n])
2147 self.check_flush_and_read(_readinto)
2148
2149 def test_flush_and_peek(self):
2150 def _peek(bufio, n=-1):
2151 # This relies on the fact that the buffer can contain the whole
2152 # raw stream, otherwise peek() can return less.
2153 b = bufio.peek(n)
2154 if n != -1:
2155 b = b[:n]
2156 bufio.seek(len(b), 1)
2157 return b
2158 self.check_flush_and_read(_peek)
2159
2160 def test_flush_and_write(self):
2161 raw = self.BytesIO(b"abcdefghi")
2162 bufio = self.tp(raw)
2163
2164 bufio.write(b"123")
2165 bufio.flush()
2166 bufio.write(b"45")
2167 bufio.flush()
2168 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002169 self.assertEqual(b"12345fghi", raw.getvalue())
2170 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002171
2172 def test_threads(self):
2173 BufferedReaderTest.test_threads(self)
2174 BufferedWriterTest.test_threads(self)
2175
2176 def test_writes_and_peek(self):
2177 def _peek(bufio):
2178 bufio.peek(1)
2179 self.check_writes(_peek)
2180 def _peek(bufio):
2181 pos = bufio.tell()
2182 bufio.seek(-1, 1)
2183 bufio.peek(1)
2184 bufio.seek(pos, 0)
2185 self.check_writes(_peek)
2186
2187 def test_writes_and_reads(self):
2188 def _read(bufio):
2189 bufio.seek(-1, 1)
2190 bufio.read(1)
2191 self.check_writes(_read)
2192
2193 def test_writes_and_read1s(self):
2194 def _read1(bufio):
2195 bufio.seek(-1, 1)
2196 bufio.read1(1)
2197 self.check_writes(_read1)
2198
2199 def test_writes_and_readintos(self):
2200 def _read(bufio):
2201 bufio.seek(-1, 1)
2202 bufio.readinto(bytearray(1))
2203 self.check_writes(_read)
2204
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002205 def test_write_after_readahead(self):
2206 # Issue #6629: writing after the buffer was filled by readahead should
2207 # first rewind the raw stream.
2208 for overwrite_size in [1, 5]:
2209 raw = self.BytesIO(b"A" * 10)
2210 bufio = self.tp(raw, 4)
2211 # Trigger readahead
2212 self.assertEqual(bufio.read(1), b"A")
2213 self.assertEqual(bufio.tell(), 1)
2214 # Overwriting should rewind the raw stream if it needs so
2215 bufio.write(b"B" * overwrite_size)
2216 self.assertEqual(bufio.tell(), overwrite_size + 1)
2217 # If the write size was smaller than the buffer size, flush() and
2218 # check that rewind happens.
2219 bufio.flush()
2220 self.assertEqual(bufio.tell(), overwrite_size + 1)
2221 s = raw.getvalue()
2222 self.assertEqual(s,
2223 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2224
Antoine Pitrou7c404892011-05-13 00:13:33 +02002225 def test_write_rewind_write(self):
2226 # Various combinations of reading / writing / seeking backwards / writing again
2227 def mutate(bufio, pos1, pos2):
2228 assert pos2 >= pos1
2229 # Fill the buffer
2230 bufio.seek(pos1)
2231 bufio.read(pos2 - pos1)
2232 bufio.write(b'\x02')
2233 # This writes earlier than the previous write, but still inside
2234 # the buffer.
2235 bufio.seek(pos1)
2236 bufio.write(b'\x01')
2237
2238 b = b"\x80\x81\x82\x83\x84"
2239 for i in range(0, len(b)):
2240 for j in range(i, len(b)):
2241 raw = self.BytesIO(b)
2242 bufio = self.tp(raw, 100)
2243 mutate(bufio, i, j)
2244 bufio.flush()
2245 expected = bytearray(b)
2246 expected[j] = 2
2247 expected[i] = 1
2248 self.assertEqual(raw.getvalue(), expected,
2249 "failed result for i=%d, j=%d" % (i, j))
2250
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002251 def test_truncate_after_read_or_write(self):
2252 raw = self.BytesIO(b"A" * 10)
2253 bufio = self.tp(raw, 100)
2254 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2255 self.assertEqual(bufio.truncate(), 2)
2256 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2257 self.assertEqual(bufio.truncate(), 4)
2258
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002259 def test_misbehaved_io(self):
2260 BufferedReaderTest.test_misbehaved_io(self)
2261 BufferedWriterTest.test_misbehaved_io(self)
2262
Antoine Pitroue05565e2011-08-20 14:39:23 +02002263 def test_interleaved_read_write(self):
2264 # Test for issue #12213
2265 with self.BytesIO(b'abcdefgh') as raw:
2266 with self.tp(raw, 100) as f:
2267 f.write(b"1")
2268 self.assertEqual(f.read(1), b'b')
2269 f.write(b'2')
2270 self.assertEqual(f.read1(1), b'd')
2271 f.write(b'3')
2272 buf = bytearray(1)
2273 f.readinto(buf)
2274 self.assertEqual(buf, b'f')
2275 f.write(b'4')
2276 self.assertEqual(f.peek(1), b'h')
2277 f.flush()
2278 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2279
2280 with self.BytesIO(b'abc') as raw:
2281 with self.tp(raw, 100) as f:
2282 self.assertEqual(f.read(1), b'a')
2283 f.write(b"2")
2284 self.assertEqual(f.read(1), b'c')
2285 f.flush()
2286 self.assertEqual(raw.getvalue(), b'a2c')
2287
2288 def test_interleaved_readline_write(self):
2289 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2290 with self.tp(raw) as f:
2291 f.write(b'1')
2292 self.assertEqual(f.readline(), b'b\n')
2293 f.write(b'2')
2294 self.assertEqual(f.readline(), b'def\n')
2295 f.write(b'3')
2296 self.assertEqual(f.readline(), b'\n')
2297 f.flush()
2298 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2299
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002300 # You can't construct a BufferedRandom over a non-seekable stream.
2301 test_unseekable = None
2302
R David Murray67bfe802013-02-23 21:51:05 -05002303
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002304class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002305 tp = io.BufferedRandom
2306
2307 def test_constructor(self):
2308 BufferedRandomTest.test_constructor(self)
2309 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08002310 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002311 if sys.maxsize > 0x7FFFFFFF:
2312 rawio = self.MockRawIO()
2313 bufio = self.tp(rawio)
2314 self.assertRaises((OverflowError, MemoryError, ValueError),
2315 bufio.__init__, rawio, sys.maxsize)
2316
2317 def test_garbage_collection(self):
2318 CBufferedReaderTest.test_garbage_collection(self)
2319 CBufferedWriterTest.test_garbage_collection(self)
2320
R David Murray67bfe802013-02-23 21:51:05 -05002321 def test_args_error(self):
2322 # Issue #17275
2323 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2324 self.tp(io.BytesIO(), 1024, 1024, 1024)
2325
2326
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002327class PyBufferedRandomTest(BufferedRandomTest):
2328 tp = pyio.BufferedRandom
2329
2330
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002331# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2332# properties:
2333# - A single output character can correspond to many bytes of input.
2334# - The number of input bytes to complete the character can be
2335# undetermined until the last input byte is received.
2336# - The number of input bytes can vary depending on previous input.
2337# - A single input byte can correspond to many characters of output.
2338# - The number of output characters can be undetermined until the
2339# last input byte is received.
2340# - The number of output characters can vary depending on previous input.
2341
2342class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2343 """
2344 For testing seek/tell behavior with a stateful, buffering decoder.
2345
2346 Input is a sequence of words. Words may be fixed-length (length set
2347 by input) or variable-length (period-terminated). In variable-length
2348 mode, extra periods are ignored. Possible words are:
2349 - 'i' followed by a number sets the input length, I (maximum 99).
2350 When I is set to 0, words are space-terminated.
2351 - 'o' followed by a number sets the output length, O (maximum 99).
2352 - Any other word is converted into a word followed by a period on
2353 the output. The output word consists of the input word truncated
2354 or padded out with hyphens to make its length equal to O. If O
2355 is 0, the word is output verbatim without truncating or padding.
2356 I and O are initially set to 1. When I changes, any buffered input is
2357 re-scanned according to the new I. EOF also terminates the last word.
2358 """
2359
2360 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002361 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002362 self.reset()
2363
2364 def __repr__(self):
2365 return '<SID %x>' % id(self)
2366
2367 def reset(self):
2368 self.i = 1
2369 self.o = 1
2370 self.buffer = bytearray()
2371
2372 def getstate(self):
2373 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2374 return bytes(self.buffer), i*100 + o
2375
2376 def setstate(self, state):
2377 buffer, io = state
2378 self.buffer = bytearray(buffer)
2379 i, o = divmod(io, 100)
2380 self.i, self.o = i ^ 1, o ^ 1
2381
2382 def decode(self, input, final=False):
2383 output = ''
2384 for b in input:
2385 if self.i == 0: # variable-length, terminated with period
2386 if b == ord('.'):
2387 if self.buffer:
2388 output += self.process_word()
2389 else:
2390 self.buffer.append(b)
2391 else: # fixed-length, terminate after self.i bytes
2392 self.buffer.append(b)
2393 if len(self.buffer) == self.i:
2394 output += self.process_word()
2395 if final and self.buffer: # EOF terminates the last word
2396 output += self.process_word()
2397 return output
2398
2399 def process_word(self):
2400 output = ''
2401 if self.buffer[0] == ord('i'):
2402 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2403 elif self.buffer[0] == ord('o'):
2404 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2405 else:
2406 output = self.buffer.decode('ascii')
2407 if len(output) < self.o:
2408 output += '-'*self.o # pad out with hyphens
2409 if self.o:
2410 output = output[:self.o] # truncate to output length
2411 output += '.'
2412 self.buffer = bytearray()
2413 return output
2414
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002415 codecEnabled = False
2416
2417 @classmethod
2418 def lookupTestDecoder(cls, name):
2419 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002420 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002421 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002422 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002423 incrementalencoder=None,
2424 streamreader=None, streamwriter=None,
2425 incrementaldecoder=cls)
2426
2427# Register the previous decoder for testing.
2428# Disabled by default, tests will enable it.
2429codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2430
2431
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002432class StatefulIncrementalDecoderTest(unittest.TestCase):
2433 """
2434 Make sure the StatefulIncrementalDecoder actually works.
2435 """
2436
2437 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002438 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002439 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002440 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002441 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002442 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002443 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002444 # I=0, O=6 (variable-length input, fixed-length output)
2445 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2446 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002447 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002448 # I=6, O=3 (fixed-length input > fixed-length output)
2449 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2450 # I=0, then 3; O=29, then 15 (with longer output)
2451 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2452 'a----------------------------.' +
2453 'b----------------------------.' +
2454 'cde--------------------------.' +
2455 'abcdefghijabcde.' +
2456 'a.b------------.' +
2457 '.c.------------.' +
2458 'd.e------------.' +
2459 'k--------------.' +
2460 'l--------------.' +
2461 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002462 ]
2463
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002464 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002465 # Try a few one-shot test cases.
2466 for input, eof, output in self.test_cases:
2467 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002468 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002469
2470 # Also test an unfinished decode, followed by forcing EOF.
2471 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002472 self.assertEqual(d.decode(b'oiabcd'), '')
2473 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002474
2475class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002476
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002477 def setUp(self):
2478 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2479 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002480 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002481
Guido van Rossumd0712812007-04-11 16:32:43 +00002482 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002483 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002484
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002485 def test_constructor(self):
2486 r = self.BytesIO(b"\xc3\xa9\n\n")
2487 b = self.BufferedReader(r, 1000)
2488 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002489 t.__init__(b, encoding="latin-1", newline="\r\n")
2490 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002491 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002492 t.__init__(b, encoding="utf-8", line_buffering=True)
2493 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002494 self.assertEqual(t.line_buffering, True)
2495 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002496 self.assertRaises(TypeError, t.__init__, b, newline=42)
2497 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2498
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002499 def test_uninitialized(self):
2500 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2501 del t
2502 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2503 self.assertRaises(Exception, repr, t)
2504 self.assertRaisesRegex((ValueError, AttributeError),
2505 'uninitialized|has no attribute',
2506 t.read, 0)
2507 t.__init__(self.MockRawIO())
2508 self.assertEqual(t.read(0), '')
2509
Nick Coghlana9b15242014-02-04 22:11:18 +10002510 def test_non_text_encoding_codecs_are_rejected(self):
2511 # Ensure the constructor complains if passed a codec that isn't
2512 # marked as a text encoding
2513 # http://bugs.python.org/issue20404
2514 r = self.BytesIO()
2515 b = self.BufferedWriter(r)
2516 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2517 self.TextIOWrapper(b, encoding="hex")
2518
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002519 def test_detach(self):
2520 r = self.BytesIO()
2521 b = self.BufferedWriter(r)
2522 t = self.TextIOWrapper(b)
2523 self.assertIs(t.detach(), b)
2524
2525 t = self.TextIOWrapper(b, encoding="ascii")
2526 t.write("howdy")
2527 self.assertFalse(r.getvalue())
2528 t.detach()
2529 self.assertEqual(r.getvalue(), b"howdy")
2530 self.assertRaises(ValueError, t.detach)
2531
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002532 # Operations independent of the detached stream should still work
2533 repr(t)
2534 self.assertEqual(t.encoding, "ascii")
2535 self.assertEqual(t.errors, "strict")
2536 self.assertFalse(t.line_buffering)
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002537 self.assertFalse(t.write_through)
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002538
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002539 def test_repr(self):
2540 raw = self.BytesIO("hello".encode("utf-8"))
2541 b = self.BufferedReader(raw)
2542 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002543 modname = self.TextIOWrapper.__module__
2544 self.assertEqual(repr(t),
2545 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2546 raw.name = "dummy"
2547 self.assertEqual(repr(t),
2548 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002549 t.mode = "r"
2550 self.assertEqual(repr(t),
2551 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002552 raw.name = b"dummy"
2553 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002554 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002555
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002556 t.buffer.detach()
2557 repr(t) # Should not raise an exception
2558
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002559 def test_recursive_repr(self):
2560 # Issue #25455
2561 raw = self.BytesIO()
2562 t = self.TextIOWrapper(raw)
2563 with support.swap_attr(raw, 'name', t):
2564 try:
2565 repr(t) # Should not crash
2566 except RuntimeError:
2567 pass
2568
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002569 def test_line_buffering(self):
2570 r = self.BytesIO()
2571 b = self.BufferedWriter(r, 1000)
2572 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002573 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002574 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002575 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002576 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002577 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002578 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002579
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002580 def test_reconfigure_line_buffering(self):
2581 r = self.BytesIO()
2582 b = self.BufferedWriter(r, 1000)
2583 t = self.TextIOWrapper(b, newline="\n", line_buffering=False)
2584 t.write("AB\nC")
2585 self.assertEqual(r.getvalue(), b"")
2586
2587 t.reconfigure(line_buffering=True) # implicit flush
2588 self.assertEqual(r.getvalue(), b"AB\nC")
2589 t.write("DEF\nG")
2590 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2591 t.write("H")
2592 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2593 t.reconfigure(line_buffering=False) # implicit flush
2594 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2595 t.write("IJ")
2596 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2597
2598 # Keeping default value
2599 t.reconfigure()
2600 t.reconfigure(line_buffering=None)
2601 self.assertEqual(t.line_buffering, False)
2602 t.reconfigure(line_buffering=True)
2603 t.reconfigure()
2604 t.reconfigure(line_buffering=None)
2605 self.assertEqual(t.line_buffering, True)
2606
Victor Stinner91106cd2017-12-13 12:29:09 +01002607 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002608 def test_default_encoding(self):
2609 old_environ = dict(os.environ)
2610 try:
2611 # try to get a user preferred encoding different than the current
2612 # locale encoding to check that TextIOWrapper() uses the current
2613 # locale encoding and not the user preferred encoding
2614 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2615 if key in os.environ:
2616 del os.environ[key]
2617
2618 current_locale_encoding = locale.getpreferredencoding(False)
2619 b = self.BytesIO()
2620 t = self.TextIOWrapper(b)
2621 self.assertEqual(t.encoding, current_locale_encoding)
2622 finally:
2623 os.environ.clear()
2624 os.environ.update(old_environ)
2625
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002626 @support.cpython_only
Victor Stinner91106cd2017-12-13 12:29:09 +01002627 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Serhiy Storchaka78980432013-01-15 01:12:17 +02002628 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002629 # Issue 15989
2630 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002631 b = self.BytesIO()
2632 b.fileno = lambda: _testcapi.INT_MAX + 1
2633 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2634 b.fileno = lambda: _testcapi.UINT_MAX + 1
2635 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2636
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002637 def test_encoding(self):
2638 # Check the encoding attribute is always set, and valid
2639 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002640 t = self.TextIOWrapper(b, encoding="utf-8")
2641 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002642 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002643 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002644 codecs.lookup(t.encoding)
2645
2646 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002647 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002648 b = self.BytesIO(b"abc\n\xff\n")
2649 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002650 self.assertRaises(UnicodeError, t.read)
2651 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002652 b = self.BytesIO(b"abc\n\xff\n")
2653 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002654 self.assertRaises(UnicodeError, t.read)
2655 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002656 b = self.BytesIO(b"abc\n\xff\n")
2657 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002658 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002659 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002660 b = self.BytesIO(b"abc\n\xff\n")
2661 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002662 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002663
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002664 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002665 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002666 b = self.BytesIO()
2667 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002668 self.assertRaises(UnicodeError, t.write, "\xff")
2669 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002670 b = self.BytesIO()
2671 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002672 self.assertRaises(UnicodeError, t.write, "\xff")
2673 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002674 b = self.BytesIO()
2675 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002676 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002677 t.write("abc\xffdef\n")
2678 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002679 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002680 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002681 b = self.BytesIO()
2682 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002683 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002684 t.write("abc\xffdef\n")
2685 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002686 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002687
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002688 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002689 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2690
2691 tests = [
2692 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002693 [ '', input_lines ],
2694 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2695 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2696 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002697 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002698 encodings = (
2699 'utf-8', 'latin-1',
2700 'utf-16', 'utf-16-le', 'utf-16-be',
2701 'utf-32', 'utf-32-le', 'utf-32-be',
2702 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002703
Guido van Rossum8358db22007-08-18 21:39:55 +00002704 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002705 # character in TextIOWrapper._pending_line.
2706 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002707 # XXX: str.encode() should return bytes
2708 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002709 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002710 for bufsize in range(1, 10):
2711 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002712 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2713 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002714 encoding=encoding)
2715 if do_reads:
2716 got_lines = []
2717 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002718 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002719 if c2 == '':
2720 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002721 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002722 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002723 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002724 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002725
2726 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002727 self.assertEqual(got_line, exp_line)
2728 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002729
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002730 def test_newlines_input(self):
2731 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002732 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2733 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002734 (None, normalized.decode("ascii").splitlines(keepends=True)),
2735 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002736 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2737 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2738 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002739 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002740 buf = self.BytesIO(testdata)
2741 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002742 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002743 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002744 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002745
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002746 def test_newlines_output(self):
2747 testdict = {
2748 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2749 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2750 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2751 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2752 }
2753 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2754 for newline, expected in tests:
2755 buf = self.BytesIO()
2756 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2757 txt.write("AAA\nB")
2758 txt.write("BB\nCCC\n")
2759 txt.write("X\rY\r\nZ")
2760 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002761 self.assertEqual(buf.closed, False)
2762 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002763
2764 def test_destructor(self):
2765 l = []
2766 base = self.BytesIO
2767 class MyBytesIO(base):
2768 def close(self):
2769 l.append(self.getvalue())
2770 base.close(self)
2771 b = MyBytesIO()
2772 t = self.TextIOWrapper(b, encoding="ascii")
2773 t.write("abc")
2774 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002775 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002776 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002777
2778 def test_override_destructor(self):
2779 record = []
2780 class MyTextIO(self.TextIOWrapper):
2781 def __del__(self):
2782 record.append(1)
2783 try:
2784 f = super().__del__
2785 except AttributeError:
2786 pass
2787 else:
2788 f()
2789 def close(self):
2790 record.append(2)
2791 super().close()
2792 def flush(self):
2793 record.append(3)
2794 super().flush()
2795 b = self.BytesIO()
2796 t = MyTextIO(b, encoding="ascii")
2797 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002798 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002799 self.assertEqual(record, [1, 2, 3])
2800
2801 def test_error_through_destructor(self):
2802 # Test that the exception state is not modified by a destructor,
2803 # even if close() fails.
2804 rawio = self.CloseFailureIO()
2805 def f():
2806 self.TextIOWrapper(rawio).xyzzy
2807 with support.captured_output("stderr") as s:
2808 self.assertRaises(AttributeError, f)
2809 s = s.getvalue().strip()
2810 if s:
2811 # The destructor *may* have printed an unraisable error, check it
2812 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002813 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002814 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002815
Guido van Rossum9b76da62007-04-11 01:09:03 +00002816 # Systematic tests of the text I/O API
2817
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002818 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002819 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002820 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002821 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002822 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002823 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002824 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002825 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002826 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002827 self.assertEqual(f.tell(), 0)
2828 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002829 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002830 self.assertEqual(f.seek(0), 0)
2831 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002832 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002833 self.assertEqual(f.read(2), "ab")
2834 self.assertEqual(f.read(1), "c")
2835 self.assertEqual(f.read(1), "")
2836 self.assertEqual(f.read(), "")
2837 self.assertEqual(f.tell(), cookie)
2838 self.assertEqual(f.seek(0), 0)
2839 self.assertEqual(f.seek(0, 2), cookie)
2840 self.assertEqual(f.write("def"), 3)
2841 self.assertEqual(f.seek(cookie), cookie)
2842 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002843 if enc.startswith("utf"):
2844 self.multi_line_test(f, enc)
2845 f.close()
2846
2847 def multi_line_test(self, f, enc):
2848 f.seek(0)
2849 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002850 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002851 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002852 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002853 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002854 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002855 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002856 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002857 wlines.append((f.tell(), line))
2858 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002859 f.seek(0)
2860 rlines = []
2861 while True:
2862 pos = f.tell()
2863 line = f.readline()
2864 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002865 break
2866 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002867 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002868
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002869 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002870 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002871 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002872 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002873 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002874 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002875 p2 = f.tell()
2876 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002877 self.assertEqual(f.tell(), p0)
2878 self.assertEqual(f.readline(), "\xff\n")
2879 self.assertEqual(f.tell(), p1)
2880 self.assertEqual(f.readline(), "\xff\n")
2881 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002882 f.seek(0)
2883 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002884 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002885 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002886 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002887 f.close()
2888
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002889 def test_seeking(self):
2890 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002891 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002892 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002893 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002894 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002895 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002896 suffix = bytes(u_suffix.encode("utf-8"))
2897 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002898 with self.open(support.TESTFN, "wb") as f:
2899 f.write(line*2)
2900 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2901 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002902 self.assertEqual(s, str(prefix, "ascii"))
2903 self.assertEqual(f.tell(), prefix_size)
2904 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002905
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002906 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002907 # Regression test for a specific bug
2908 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002909 with self.open(support.TESTFN, "wb") as f:
2910 f.write(data)
2911 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2912 f._CHUNK_SIZE # Just test that it exists
2913 f._CHUNK_SIZE = 2
2914 f.readline()
2915 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002916
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002917 def test_seek_and_tell(self):
2918 #Test seek/tell using the StatefulIncrementalDecoder.
2919 # Make test faster by doing smaller seeks
2920 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002921
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002922 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002923 """Tell/seek to various points within a data stream and ensure
2924 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002925 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002926 f.write(data)
2927 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002928 f = self.open(support.TESTFN, encoding='test_decoder')
2929 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002930 decoded = f.read()
2931 f.close()
2932
Neal Norwitze2b07052008-03-18 19:52:05 +00002933 for i in range(min_pos, len(decoded) + 1): # seek positions
2934 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002935 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002936 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002937 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002938 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002939 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002940 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002941 f.close()
2942
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002943 # Enable the test decoder.
2944 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002945
2946 # Run the tests.
2947 try:
2948 # Try each test case.
2949 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002950 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002951
2952 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002953 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2954 offset = CHUNK_SIZE - len(input)//2
2955 prefix = b'.'*offset
2956 # Don't bother seeking into the prefix (takes too long).
2957 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002958 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002959
2960 # Ensure our test decoder won't interfere with subsequent tests.
2961 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002962 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002963
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002964 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002965 data = "1234567890"
2966 tests = ("utf-16",
2967 "utf-16-le",
2968 "utf-16-be",
2969 "utf-32",
2970 "utf-32-le",
2971 "utf-32-be")
2972 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002973 buf = self.BytesIO()
2974 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002975 # Check if the BOM is written only once (see issue1753).
2976 f.write(data)
2977 f.write(data)
2978 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002979 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002980 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002981 self.assertEqual(f.read(), data * 2)
2982 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002983
Benjamin Petersona1b49012009-03-31 23:11:32 +00002984 def test_unreadable(self):
2985 class UnReadable(self.BytesIO):
2986 def readable(self):
2987 return False
2988 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002989 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002990
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002991 def test_read_one_by_one(self):
2992 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002993 reads = ""
2994 while True:
2995 c = txt.read(1)
2996 if not c:
2997 break
2998 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002999 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003000
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00003001 def test_readlines(self):
3002 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
3003 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
3004 txt.seek(0)
3005 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
3006 txt.seek(0)
3007 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
3008
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003009 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003010 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003011 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003012 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003013 reads = ""
3014 while True:
3015 c = txt.read(128)
3016 if not c:
3017 break
3018 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003019 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003020
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02003021 def test_writelines(self):
3022 l = ['ab', 'cd', 'ef']
3023 buf = self.BytesIO()
3024 txt = self.TextIOWrapper(buf)
3025 txt.writelines(l)
3026 txt.flush()
3027 self.assertEqual(buf.getvalue(), b'abcdef')
3028
3029 def test_writelines_userlist(self):
3030 l = UserList(['ab', 'cd', 'ef'])
3031 buf = self.BytesIO()
3032 txt = self.TextIOWrapper(buf)
3033 txt.writelines(l)
3034 txt.flush()
3035 self.assertEqual(buf.getvalue(), b'abcdef')
3036
3037 def test_writelines_error(self):
3038 txt = self.TextIOWrapper(self.BytesIO())
3039 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3040 self.assertRaises(TypeError, txt.writelines, None)
3041 self.assertRaises(TypeError, txt.writelines, b'abc')
3042
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003043 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003044 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003045
3046 # read one char at a time
3047 reads = ""
3048 while True:
3049 c = txt.read(1)
3050 if not c:
3051 break
3052 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003053 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003054
3055 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003056 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003057 txt._CHUNK_SIZE = 4
3058
3059 reads = ""
3060 while True:
3061 c = txt.read(4)
3062 if not c:
3063 break
3064 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003065 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003066
3067 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003068 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003069 txt._CHUNK_SIZE = 4
3070
3071 reads = txt.read(4)
3072 reads += txt.read(4)
3073 reads += txt.readline()
3074 reads += txt.readline()
3075 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003076 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003077
3078 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003079 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003080 txt._CHUNK_SIZE = 4
3081
3082 reads = txt.read(4)
3083 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003084 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003085
3086 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003087 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003088 txt._CHUNK_SIZE = 4
3089
3090 reads = txt.read(4)
3091 pos = txt.tell()
3092 txt.seek(0)
3093 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003094 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003095
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003096 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003097 buffer = self.BytesIO(self.testdata)
3098 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003099
3100 self.assertEqual(buffer.seekable(), txt.seekable())
3101
Antoine Pitroue4501852009-05-14 18:55:55 +00003102 def test_append_bom(self):
3103 # The BOM is not written again when appending to a non-empty file
3104 filename = support.TESTFN
3105 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3106 with self.open(filename, 'w', encoding=charset) as f:
3107 f.write('aaa')
3108 pos = f.tell()
3109 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003110 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003111
3112 with self.open(filename, 'a', encoding=charset) as f:
3113 f.write('xxx')
3114 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003115 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003116
3117 def test_seek_bom(self):
3118 # Same test, but when seeking manually
3119 filename = support.TESTFN
3120 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3121 with self.open(filename, 'w', encoding=charset) as f:
3122 f.write('aaa')
3123 pos = f.tell()
3124 with self.open(filename, 'r+', encoding=charset) as f:
3125 f.seek(pos)
3126 f.write('zzz')
3127 f.seek(0)
3128 f.write('bbb')
3129 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003130 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003131
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003132 def test_seek_append_bom(self):
3133 # Same test, but first seek to the start and then to the end
3134 filename = support.TESTFN
3135 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3136 with self.open(filename, 'w', encoding=charset) as f:
3137 f.write('aaa')
3138 with self.open(filename, 'a', encoding=charset) as f:
3139 f.seek(0)
3140 f.seek(0, self.SEEK_END)
3141 f.write('xxx')
3142 with self.open(filename, 'rb') as f:
3143 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3144
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003145 def test_errors_property(self):
3146 with self.open(support.TESTFN, "w") as f:
3147 self.assertEqual(f.errors, "strict")
3148 with self.open(support.TESTFN, "w", errors="replace") as f:
3149 self.assertEqual(f.errors, "replace")
3150
Brett Cannon31f59292011-02-21 19:29:56 +00003151 @support.no_tracing
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003152 def test_threads_write(self):
3153 # Issue6750: concurrent writes could duplicate data
3154 event = threading.Event()
3155 with self.open(support.TESTFN, "w", buffering=1) as f:
3156 def run(n):
3157 text = "Thread%03d\n" % n
3158 event.wait()
3159 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003160 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003161 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003162 with support.start_threads(threads, event.set):
3163 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003164 with self.open(support.TESTFN) as f:
3165 content = f.read()
3166 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003167 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003168
Antoine Pitrou6be88762010-05-03 16:48:20 +00003169 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003170 # Test that text file is closed despite failed flush
3171 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003172 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003173 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003174 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003175 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003176 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003177 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003178 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003179 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003180 self.assertTrue(txt.buffer.closed)
3181 self.assertTrue(closed) # flush() called
3182 self.assertFalse(closed[0]) # flush() called before file closed
3183 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003184 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003185
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003186 def test_close_error_on_close(self):
3187 buffer = self.BytesIO(self.testdata)
3188 def bad_flush():
3189 raise OSError('flush')
3190 def bad_close():
3191 raise OSError('close')
3192 buffer.close = bad_close
3193 txt = self.TextIOWrapper(buffer, encoding="ascii")
3194 txt.flush = bad_flush
3195 with self.assertRaises(OSError) as err: # exception not swallowed
3196 txt.close()
3197 self.assertEqual(err.exception.args, ('close',))
3198 self.assertIsInstance(err.exception.__context__, OSError)
3199 self.assertEqual(err.exception.__context__.args, ('flush',))
3200 self.assertFalse(txt.closed)
3201
3202 def test_nonnormalized_close_error_on_close(self):
3203 # Issue #21677
3204 buffer = self.BytesIO(self.testdata)
3205 def bad_flush():
3206 raise non_existing_flush
3207 def bad_close():
3208 raise non_existing_close
3209 buffer.close = bad_close
3210 txt = self.TextIOWrapper(buffer, encoding="ascii")
3211 txt.flush = bad_flush
3212 with self.assertRaises(NameError) as err: # exception not swallowed
3213 txt.close()
3214 self.assertIn('non_existing_close', str(err.exception))
3215 self.assertIsInstance(err.exception.__context__, NameError)
3216 self.assertIn('non_existing_flush', str(err.exception.__context__))
3217 self.assertFalse(txt.closed)
3218
Antoine Pitrou6be88762010-05-03 16:48:20 +00003219 def test_multi_close(self):
3220 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3221 txt.close()
3222 txt.close()
3223 txt.close()
3224 self.assertRaises(ValueError, txt.flush)
3225
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003226 def test_unseekable(self):
3227 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3228 self.assertRaises(self.UnsupportedOperation, txt.tell)
3229 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3230
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003231 def test_readonly_attributes(self):
3232 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3233 buf = self.BytesIO(self.testdata)
3234 with self.assertRaises(AttributeError):
3235 txt.buffer = buf
3236
Antoine Pitroue96ec682011-07-23 21:46:35 +02003237 def test_rawio(self):
3238 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3239 # that subprocess.Popen() can have the required unbuffered
3240 # semantics with universal_newlines=True.
3241 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3242 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3243 # Reads
3244 self.assertEqual(txt.read(4), 'abcd')
3245 self.assertEqual(txt.readline(), 'efghi\n')
3246 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3247
3248 def test_rawio_write_through(self):
3249 # Issue #12591: with write_through=True, writes don't need a flush
3250 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3251 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3252 write_through=True)
3253 txt.write('1')
3254 txt.write('23\n4')
3255 txt.write('5')
3256 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3257
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003258 def test_bufio_write_through(self):
3259 # Issue #21396: write_through=True doesn't force a flush()
3260 # on the underlying binary buffered object.
3261 flush_called, write_called = [], []
3262 class BufferedWriter(self.BufferedWriter):
3263 def flush(self, *args, **kwargs):
3264 flush_called.append(True)
3265 return super().flush(*args, **kwargs)
3266 def write(self, *args, **kwargs):
3267 write_called.append(True)
3268 return super().write(*args, **kwargs)
3269
3270 rawio = self.BytesIO()
3271 data = b"a"
3272 bufio = BufferedWriter(rawio, len(data)*2)
3273 textio = self.TextIOWrapper(bufio, encoding='ascii',
3274 write_through=True)
3275 # write to the buffered io but don't overflow the buffer
3276 text = data.decode('ascii')
3277 textio.write(text)
3278
3279 # buffer.flush is not called with write_through=True
3280 self.assertFalse(flush_called)
3281 # buffer.write *is* called with write_through=True
3282 self.assertTrue(write_called)
3283 self.assertEqual(rawio.getvalue(), b"") # no flush
3284
3285 write_called = [] # reset
3286 textio.write(text * 10) # total content is larger than bufio buffer
3287 self.assertTrue(write_called)
3288 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3289
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02003290 def test_reconfigure_write_through(self):
3291 raw = self.MockRawIO([])
3292 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3293 t.write('1')
3294 t.reconfigure(write_through=True) # implied flush
3295 self.assertEqual(t.write_through, True)
3296 self.assertEqual(b''.join(raw._write_stack), b'1')
3297 t.write('23')
3298 self.assertEqual(b''.join(raw._write_stack), b'123')
3299 t.reconfigure(write_through=False)
3300 self.assertEqual(t.write_through, False)
3301 t.write('45')
3302 t.flush()
3303 self.assertEqual(b''.join(raw._write_stack), b'12345')
3304 # Keeping default value
3305 t.reconfigure()
3306 t.reconfigure(write_through=None)
3307 self.assertEqual(t.write_through, False)
3308 t.reconfigure(write_through=True)
3309 t.reconfigure()
3310 t.reconfigure(write_through=None)
3311 self.assertEqual(t.write_through, True)
3312
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003313 def test_read_nonbytes(self):
3314 # Issue #17106
3315 # Crash when underlying read() returns non-bytes
3316 t = self.TextIOWrapper(self.StringIO('a'))
3317 self.assertRaises(TypeError, t.read, 1)
3318 t = self.TextIOWrapper(self.StringIO('a'))
3319 self.assertRaises(TypeError, t.readline)
3320 t = self.TextIOWrapper(self.StringIO('a'))
3321 self.assertRaises(TypeError, t.read)
3322
Oren Milmana5b4ea12017-08-25 21:14:54 +03003323 def test_illegal_encoder(self):
3324 # Issue 31271: Calling write() while the return value of encoder's
3325 # encode() is invalid shouldn't cause an assertion failure.
3326 rot13 = codecs.lookup("rot13")
3327 with support.swap_attr(rot13, '_is_text_encoding', True):
3328 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3329 self.assertRaises(TypeError, t.write, 'bar')
3330
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003331 def test_illegal_decoder(self):
3332 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003333 # Bypass the early encoding check added in issue 20404
3334 def _make_illegal_wrapper():
3335 quopri = codecs.lookup("quopri")
3336 quopri._is_text_encoding = True
3337 try:
3338 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3339 newline='\n', encoding="quopri")
3340 finally:
3341 quopri._is_text_encoding = False
3342 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003343 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003344 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003345 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003346 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003347 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003348 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003349 self.assertRaises(TypeError, t.read)
3350
Oren Milmanba7d7362017-08-29 11:58:27 +03003351 # Issue 31243: calling read() while the return value of decoder's
3352 # getstate() is invalid should neither crash the interpreter nor
3353 # raise a SystemError.
3354 def _make_very_illegal_wrapper(getstate_ret_val):
3355 class BadDecoder:
3356 def getstate(self):
3357 return getstate_ret_val
3358 def _get_bad_decoder(dummy):
3359 return BadDecoder()
3360 quopri = codecs.lookup("quopri")
3361 with support.swap_attr(quopri, 'incrementaldecoder',
3362 _get_bad_decoder):
3363 return _make_illegal_wrapper()
3364 t = _make_very_illegal_wrapper(42)
3365 self.assertRaises(TypeError, t.read, 42)
3366 t = _make_very_illegal_wrapper(())
3367 self.assertRaises(TypeError, t.read, 42)
3368 t = _make_very_illegal_wrapper((1, 2))
3369 self.assertRaises(TypeError, t.read, 42)
3370
Antoine Pitrou712cb732013-12-21 15:51:54 +01003371 def _check_create_at_shutdown(self, **kwargs):
3372 # Issue #20037: creating a TextIOWrapper at shutdown
3373 # shouldn't crash the interpreter.
3374 iomod = self.io.__name__
3375 code = """if 1:
3376 import codecs
3377 import {iomod} as io
3378
3379 # Avoid looking up codecs at shutdown
3380 codecs.lookup('utf-8')
3381
3382 class C:
3383 def __init__(self):
3384 self.buf = io.BytesIO()
3385 def __del__(self):
3386 io.TextIOWrapper(self.buf, **{kwargs})
3387 print("ok")
3388 c = C()
3389 """.format(iomod=iomod, kwargs=kwargs)
3390 return assert_python_ok("-c", code)
3391
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003392 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003393 def test_create_at_shutdown_without_encoding(self):
3394 rc, out, err = self._check_create_at_shutdown()
3395 if err:
3396 # Can error out with a RuntimeError if the module state
3397 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003398 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003399 else:
3400 self.assertEqual("ok", out.decode().strip())
3401
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003402 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003403 def test_create_at_shutdown_with_encoding(self):
3404 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3405 errors='strict')
3406 self.assertFalse(err)
3407 self.assertEqual("ok", out.decode().strip())
3408
Antoine Pitroub8503892014-04-29 10:14:02 +02003409 def test_read_byteslike(self):
3410 r = MemviewBytesIO(b'Just some random string\n')
3411 t = self.TextIOWrapper(r, 'utf-8')
3412
3413 # TextIOwrapper will not read the full string, because
3414 # we truncate it to a multiple of the native int size
3415 # so that we can construct a more complex memoryview.
3416 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3417
3418 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3419
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003420 def test_issue22849(self):
3421 class F(object):
3422 def readable(self): return True
3423 def writable(self): return True
3424 def seekable(self): return True
3425
3426 for i in range(10):
3427 try:
3428 self.TextIOWrapper(F(), encoding='utf-8')
3429 except Exception:
3430 pass
3431
3432 F.tell = lambda x: 0
3433 t = self.TextIOWrapper(F(), encoding='utf-8')
3434
INADA Naoki507434f2017-12-21 09:59:53 +09003435 def test_reconfigure_encoding_read(self):
3436 # latin1 -> utf8
3437 # (latin1 can decode utf-8 encoded string)
3438 data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
3439 raw = self.BytesIO(data)
3440 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3441 self.assertEqual(txt.readline(), 'abc\xe9\n')
3442 with self.assertRaises(self.UnsupportedOperation):
3443 txt.reconfigure(encoding='utf-8')
3444 with self.assertRaises(self.UnsupportedOperation):
3445 txt.reconfigure(newline=None)
3446
3447 def test_reconfigure_write_fromascii(self):
3448 # ascii has a specific encodefunc in the C implementation,
3449 # but utf-8-sig has not. Make sure that we get rid of the
3450 # cached encodefunc when we switch encoders.
3451 raw = self.BytesIO()
3452 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3453 txt.write('foo\n')
3454 txt.reconfigure(encoding='utf-8-sig')
3455 txt.write('\xe9\n')
3456 txt.flush()
3457 self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
3458
3459 def test_reconfigure_write(self):
3460 # latin -> utf8
3461 raw = self.BytesIO()
3462 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3463 txt.write('abc\xe9\n')
3464 txt.reconfigure(encoding='utf-8')
3465 self.assertEqual(raw.getvalue(), b'abc\xe9\n')
3466 txt.write('d\xe9f\n')
3467 txt.flush()
3468 self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
3469
3470 # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
3471 # the file
3472 raw = self.BytesIO()
3473 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3474 txt.write('abc\n')
3475 txt.reconfigure(encoding='utf-8-sig')
3476 txt.write('d\xe9f\n')
3477 txt.flush()
3478 self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
3479
3480 def test_reconfigure_write_non_seekable(self):
3481 raw = self.BytesIO()
3482 raw.seekable = lambda: False
3483 raw.seek = None
3484 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3485 txt.write('abc\n')
3486 txt.reconfigure(encoding='utf-8-sig')
3487 txt.write('d\xe9f\n')
3488 txt.flush()
3489
3490 # If the raw stream is not seekable, there'll be a BOM
3491 self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
3492
3493 def test_reconfigure_defaults(self):
3494 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
3495 txt.reconfigure(encoding=None)
3496 self.assertEqual(txt.encoding, 'ascii')
3497 self.assertEqual(txt.errors, 'replace')
3498 txt.write('LF\n')
3499
3500 txt.reconfigure(newline='\r\n')
3501 self.assertEqual(txt.encoding, 'ascii')
3502 self.assertEqual(txt.errors, 'replace')
3503
3504 txt.reconfigure(errors='ignore')
3505 self.assertEqual(txt.encoding, 'ascii')
3506 self.assertEqual(txt.errors, 'ignore')
3507 txt.write('CRLF\n')
3508
3509 txt.reconfigure(encoding='utf-8', newline=None)
3510 self.assertEqual(txt.errors, 'strict')
3511 txt.seek(0)
3512 self.assertEqual(txt.read(), 'LF\nCRLF\n')
3513
3514 self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
3515
3516 def test_reconfigure_newline(self):
3517 raw = self.BytesIO(b'CR\rEOF')
3518 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3519 txt.reconfigure(newline=None)
3520 self.assertEqual(txt.readline(), 'CR\n')
3521 raw = self.BytesIO(b'CR\rEOF')
3522 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3523 txt.reconfigure(newline='')
3524 self.assertEqual(txt.readline(), 'CR\r')
3525 raw = self.BytesIO(b'CR\rLF\nEOF')
3526 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3527 txt.reconfigure(newline='\n')
3528 self.assertEqual(txt.readline(), 'CR\rLF\n')
3529 raw = self.BytesIO(b'LF\nCR\rEOF')
3530 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3531 txt.reconfigure(newline='\r')
3532 self.assertEqual(txt.readline(), 'LF\nCR\r')
3533 raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
3534 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3535 txt.reconfigure(newline='\r\n')
3536 self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
3537
3538 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
3539 txt.reconfigure(newline=None)
3540 txt.write('linesep\n')
3541 txt.reconfigure(newline='')
3542 txt.write('LF\n')
3543 txt.reconfigure(newline='\n')
3544 txt.write('LF\n')
3545 txt.reconfigure(newline='\r')
3546 txt.write('CR\n')
3547 txt.reconfigure(newline='\r\n')
3548 txt.write('CRLF\n')
3549 expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
3550 self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
3551
Zackery Spytz23db9352018-06-29 04:14:58 -06003552 def test_issue25862(self):
3553 # Assertion failures occurred in tell() after read() and write().
3554 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3555 t.read(1)
3556 t.read()
3557 t.tell()
3558 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3559 t.read(1)
3560 t.write('x')
3561 t.tell()
3562
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003563
Antoine Pitroub8503892014-04-29 10:14:02 +02003564class MemviewBytesIO(io.BytesIO):
3565 '''A BytesIO object whose read method returns memoryviews
3566 rather than bytes'''
3567
3568 def read1(self, len_):
3569 return _to_memoryview(super().read1(len_))
3570
3571 def read(self, len_):
3572 return _to_memoryview(super().read(len_))
3573
3574def _to_memoryview(buf):
3575 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3576
3577 arr = array.array('i')
3578 idx = len(buf) - len(buf) % arr.itemsize
3579 arr.frombytes(buf[:idx])
3580 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003581
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003582
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003583class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003584 io = io
Steve Dowerfe8f4c92016-09-07 09:31:52 -07003585 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003586
3587 def test_initialization(self):
3588 r = self.BytesIO(b"\xc3\xa9\n\n")
3589 b = self.BufferedReader(r, 1000)
3590 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003591 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3592 self.assertRaises(ValueError, t.read)
3593
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003594 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3595 self.assertRaises(Exception, repr, t)
3596
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003597 def test_garbage_collection(self):
3598 # C TextIOWrapper objects are collected, and collecting them flushes
3599 # all data to disk.
3600 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003601 with support.check_warnings(('', ResourceWarning)):
3602 rawio = io.FileIO(support.TESTFN, "wb")
3603 b = self.BufferedWriter(rawio)
3604 t = self.TextIOWrapper(b, encoding="ascii")
3605 t.write("456def")
3606 t.x = t
3607 wr = weakref.ref(t)
3608 del t
3609 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003610 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003611 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003612 self.assertEqual(f.read(), b"456def")
3613
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003614 def test_rwpair_cleared_before_textio(self):
3615 # Issue 13070: TextIOWrapper's finalization would crash when called
3616 # after the reference to the underlying BufferedRWPair's writer got
3617 # cleared by the GC.
3618 for i in range(1000):
3619 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3620 t1 = self.TextIOWrapper(b1, encoding="ascii")
3621 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3622 t2 = self.TextIOWrapper(b2, encoding="ascii")
3623 # circular references
3624 t1.buddy = t2
3625 t2.buddy = t1
3626 support.gc_collect()
3627
3628
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003629class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003630 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003631 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003632
3633
3634class IncrementalNewlineDecoderTest(unittest.TestCase):
3635
3636 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003637 # UTF-8 specific tests for a newline decoder
3638 def _check_decode(b, s, **kwargs):
3639 # We exercise getstate() / setstate() as well as decode()
3640 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003641 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003642 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003643 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003644
Antoine Pitrou180a3362008-12-14 16:36:46 +00003645 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003646
Antoine Pitrou180a3362008-12-14 16:36:46 +00003647 _check_decode(b'\xe8', "")
3648 _check_decode(b'\xa2', "")
3649 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003650
Antoine Pitrou180a3362008-12-14 16:36:46 +00003651 _check_decode(b'\xe8', "")
3652 _check_decode(b'\xa2', "")
3653 _check_decode(b'\x88', "\u8888")
3654
3655 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003656 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3657
Antoine Pitrou180a3362008-12-14 16:36:46 +00003658 decoder.reset()
3659 _check_decode(b'\n', "\n")
3660 _check_decode(b'\r', "")
3661 _check_decode(b'', "\n", final=True)
3662 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003663
Antoine Pitrou180a3362008-12-14 16:36:46 +00003664 _check_decode(b'\r', "")
3665 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003666
Antoine Pitrou180a3362008-12-14 16:36:46 +00003667 _check_decode(b'\r\r\n', "\n\n")
3668 _check_decode(b'\r', "")
3669 _check_decode(b'\r', "\n")
3670 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003671
Antoine Pitrou180a3362008-12-14 16:36:46 +00003672 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3673 _check_decode(b'\xe8\xa2\x88', "\u8888")
3674 _check_decode(b'\n', "\n")
3675 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3676 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003677
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003678 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003679 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003680 if encoding is not None:
3681 encoder = codecs.getincrementalencoder(encoding)()
3682 def _decode_bytewise(s):
3683 # Decode one byte at a time
3684 for b in encoder.encode(s):
3685 result.append(decoder.decode(bytes([b])))
3686 else:
3687 encoder = None
3688 def _decode_bytewise(s):
3689 # Decode one char at a time
3690 for c in s:
3691 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003692 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003693 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003694 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003695 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003696 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003697 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003698 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003699 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003700 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003701 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003702 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003703 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003704 input = "abc"
3705 if encoder is not None:
3706 encoder.reset()
3707 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003708 self.assertEqual(decoder.decode(input), "abc")
3709 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003710
3711 def test_newline_decoder(self):
3712 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003713 # None meaning the IncrementalNewlineDecoder takes unicode input
3714 # rather than bytes input
3715 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003716 'utf-16', 'utf-16-le', 'utf-16-be',
3717 'utf-32', 'utf-32-le', 'utf-32-be',
3718 )
3719 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003720 decoder = enc and codecs.getincrementaldecoder(enc)()
3721 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3722 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003723 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003724 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3725 self.check_newline_decoding_utf8(decoder)
Oren Milman1d1d3e92017-08-20 18:35:36 +03003726 self.assertRaises(TypeError, decoder.setstate, 42)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003727
Antoine Pitrou66913e22009-03-06 23:40:56 +00003728 def test_newline_bytes(self):
3729 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3730 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003731 self.assertEqual(dec.newlines, None)
3732 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3733 self.assertEqual(dec.newlines, None)
3734 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3735 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003736 dec = self.IncrementalNewlineDecoder(None, translate=False)
3737 _check(dec)
3738 dec = self.IncrementalNewlineDecoder(None, translate=True)
3739 _check(dec)
3740
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003741class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3742 pass
3743
3744class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3745 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003746
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003747
Guido van Rossum01a27522007-03-07 01:00:12 +00003748# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003749
Guido van Rossum5abbf752007-08-27 17:39:33 +00003750class MiscIOTest(unittest.TestCase):
3751
Barry Warsaw40e82462008-11-20 20:14:50 +00003752 def tearDown(self):
3753 support.unlink(support.TESTFN)
3754
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003755 def test___all__(self):
3756 for name in self.io.__all__:
3757 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003758 self.assertIsNotNone(obj, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003759 if name == "open":
3760 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003761 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003762 self.assertTrue(issubclass(obj, Exception), name)
3763 elif not name.startswith("SEEK_"):
3764 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003765
Barry Warsaw40e82462008-11-20 20:14:50 +00003766 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003767 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003768 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003769 f.close()
3770
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003771 with support.check_warnings(('', DeprecationWarning)):
3772 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003773 self.assertEqual(f.name, support.TESTFN)
3774 self.assertEqual(f.buffer.name, support.TESTFN)
3775 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3776 self.assertEqual(f.mode, "U")
3777 self.assertEqual(f.buffer.mode, "rb")
3778 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003779 f.close()
3780
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003781 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003782 self.assertEqual(f.mode, "w+")
3783 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3784 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003785
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003786 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003787 self.assertEqual(g.mode, "wb")
3788 self.assertEqual(g.raw.mode, "wb")
3789 self.assertEqual(g.name, f.fileno())
3790 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003791 f.close()
3792 g.close()
3793
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003794 def test_io_after_close(self):
3795 for kwargs in [
3796 {"mode": "w"},
3797 {"mode": "wb"},
3798 {"mode": "w", "buffering": 1},
3799 {"mode": "w", "buffering": 2},
3800 {"mode": "wb", "buffering": 0},
3801 {"mode": "r"},
3802 {"mode": "rb"},
3803 {"mode": "r", "buffering": 1},
3804 {"mode": "r", "buffering": 2},
3805 {"mode": "rb", "buffering": 0},
3806 {"mode": "w+"},
3807 {"mode": "w+b"},
3808 {"mode": "w+", "buffering": 1},
3809 {"mode": "w+", "buffering": 2},
3810 {"mode": "w+b", "buffering": 0},
3811 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003812 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003813 f.close()
3814 self.assertRaises(ValueError, f.flush)
3815 self.assertRaises(ValueError, f.fileno)
3816 self.assertRaises(ValueError, f.isatty)
3817 self.assertRaises(ValueError, f.__iter__)
3818 if hasattr(f, "peek"):
3819 self.assertRaises(ValueError, f.peek, 1)
3820 self.assertRaises(ValueError, f.read)
3821 if hasattr(f, "read1"):
3822 self.assertRaises(ValueError, f.read1, 1024)
Martin Panterccb2c0e2016-10-20 23:48:14 +00003823 self.assertRaises(ValueError, f.read1)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003824 if hasattr(f, "readall"):
3825 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003826 if hasattr(f, "readinto"):
3827 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003828 if hasattr(f, "readinto1"):
3829 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003830 self.assertRaises(ValueError, f.readline)
3831 self.assertRaises(ValueError, f.readlines)
Xiang Zhang026435c2017-04-15 12:47:28 +08003832 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003833 self.assertRaises(ValueError, f.seek, 0)
3834 self.assertRaises(ValueError, f.tell)
3835 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003836 self.assertRaises(ValueError, f.write,
3837 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003838 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003839 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003840
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003841 def test_blockingioerror(self):
3842 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003843 class C(str):
3844 pass
3845 c = C("")
3846 b = self.BlockingIOError(1, c)
3847 c.b = b
3848 b.c = c
3849 wr = weakref.ref(c)
3850 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003851 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003852 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003853
3854 def test_abcs(self):
3855 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003856 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3857 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3858 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3859 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003860
3861 def _check_abc_inheritance(self, abcmodule):
3862 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003863 self.assertIsInstance(f, abcmodule.IOBase)
3864 self.assertIsInstance(f, abcmodule.RawIOBase)
3865 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3866 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003867 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003868 self.assertIsInstance(f, abcmodule.IOBase)
3869 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3870 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3871 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003872 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003873 self.assertIsInstance(f, abcmodule.IOBase)
3874 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3875 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3876 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003877
3878 def test_abc_inheritance(self):
3879 # Test implementations inherit from their respective ABCs
3880 self._check_abc_inheritance(self)
3881
3882 def test_abc_inheritance_official(self):
3883 # Test implementations inherit from the official ABCs of the
3884 # baseline "io" module.
3885 self._check_abc_inheritance(io)
3886
Antoine Pitroue033e062010-10-29 10:38:18 +00003887 def _check_warn_on_dealloc(self, *args, **kwargs):
3888 f = open(*args, **kwargs)
3889 r = repr(f)
3890 with self.assertWarns(ResourceWarning) as cm:
3891 f = None
3892 support.gc_collect()
3893 self.assertIn(r, str(cm.warning.args[0]))
3894
3895 def test_warn_on_dealloc(self):
3896 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3897 self._check_warn_on_dealloc(support.TESTFN, "wb")
3898 self._check_warn_on_dealloc(support.TESTFN, "w")
3899
3900 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3901 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003902 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003903 for fd in fds:
3904 try:
3905 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003906 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003907 if e.errno != errno.EBADF:
3908 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003909 self.addCleanup(cleanup_fds)
3910 r, w = os.pipe()
3911 fds += r, w
3912 self._check_warn_on_dealloc(r, *args, **kwargs)
3913 # When using closefd=False, there's no warning
3914 r, w = os.pipe()
3915 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02003916 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00003917 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00003918
3919 def test_warn_on_dealloc_fd(self):
3920 self._check_warn_on_dealloc_fd("rb", buffering=0)
3921 self._check_warn_on_dealloc_fd("rb")
3922 self._check_warn_on_dealloc_fd("r")
3923
3924
Antoine Pitrou243757e2010-11-05 21:15:39 +00003925 def test_pickling(self):
3926 # Pickling file objects is forbidden
3927 for kwargs in [
3928 {"mode": "w"},
3929 {"mode": "wb"},
3930 {"mode": "wb", "buffering": 0},
3931 {"mode": "r"},
3932 {"mode": "rb"},
3933 {"mode": "rb", "buffering": 0},
3934 {"mode": "w+"},
3935 {"mode": "w+b"},
3936 {"mode": "w+b", "buffering": 0},
3937 ]:
3938 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3939 with self.open(support.TESTFN, **kwargs) as f:
3940 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3941
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003942 def test_nonblock_pipe_write_bigbuf(self):
3943 self._test_nonblock_pipe_write(16*1024)
3944
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003945 def test_nonblock_pipe_write_smallbuf(self):
3946 self._test_nonblock_pipe_write(1024)
3947
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003948 @unittest.skipUnless(hasattr(os, 'set_blocking'),
3949 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003950 def _test_nonblock_pipe_write(self, bufsize):
3951 sent = []
3952 received = []
3953 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003954 os.set_blocking(r, False)
3955 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003956
3957 # To exercise all code paths in the C implementation we need
3958 # to play with buffer sizes. For instance, if we choose a
3959 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3960 # then we will never get a partial write of the buffer.
3961 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3962 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3963
3964 with rf, wf:
3965 for N in 9999, 73, 7574:
3966 try:
3967 i = 0
3968 while True:
3969 msg = bytes([i % 26 + 97]) * N
3970 sent.append(msg)
3971 wf.write(msg)
3972 i += 1
3973
3974 except self.BlockingIOError as e:
3975 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003976 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003977 sent[-1] = sent[-1][:e.characters_written]
3978 received.append(rf.read())
3979 msg = b'BLOCKED'
3980 wf.write(msg)
3981 sent.append(msg)
3982
3983 while True:
3984 try:
3985 wf.flush()
3986 break
3987 except self.BlockingIOError as e:
3988 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003989 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003990 self.assertEqual(e.characters_written, 0)
3991 received.append(rf.read())
3992
3993 received += iter(rf.read, None)
3994
3995 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003996 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003997 self.assertTrue(wf.closed)
3998 self.assertTrue(rf.closed)
3999
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004000 def test_create_fail(self):
4001 # 'x' mode fails if file is existing
4002 with self.open(support.TESTFN, 'w'):
4003 pass
4004 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
4005
4006 def test_create_writes(self):
4007 # 'x' mode opens for writing
4008 with self.open(support.TESTFN, 'xb') as f:
4009 f.write(b"spam")
4010 with self.open(support.TESTFN, 'rb') as f:
4011 self.assertEqual(b"spam", f.read())
4012
Christian Heimes7b648752012-09-10 14:48:43 +02004013 def test_open_allargs(self):
4014 # there used to be a buffer overflow in the parser for rawmode
4015 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
4016
4017
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004018class CMiscIOTest(MiscIOTest):
4019 io = io
4020
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03004021 def test_readinto_buffer_overflow(self):
4022 # Issue #18025
4023 class BadReader(self.io.BufferedIOBase):
4024 def read(self, n=-1):
4025 return b'x' * 10**6
4026 bufio = BadReader()
4027 b = bytearray(2)
4028 self.assertRaises(ValueError, bufio.readinto, b)
4029
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004030 def check_daemon_threads_shutdown_deadlock(self, stream_name):
4031 # Issue #23309: deadlocks at shutdown should be avoided when a
4032 # daemon thread and the main thread both write to a file.
4033 code = """if 1:
4034 import sys
4035 import time
4036 import threading
Victor Stinner2a1aed02017-04-21 17:59:23 +02004037 from test.support import SuppressCrashReport
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004038
4039 file = sys.{stream_name}
4040
4041 def run():
4042 while True:
4043 file.write('.')
4044 file.flush()
4045
Victor Stinner2a1aed02017-04-21 17:59:23 +02004046 crash = SuppressCrashReport()
4047 crash.__enter__()
4048 # don't call __exit__(): the crash occurs at Python shutdown
4049
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004050 thread = threading.Thread(target=run)
4051 thread.daemon = True
4052 thread.start()
4053
4054 time.sleep(0.5)
4055 file.write('!')
4056 file.flush()
4057 """.format_map(locals())
4058 res, _ = run_python_until_end("-c", code)
4059 err = res.err.decode()
4060 if res.rc != 0:
4061 # Failure: should be a fatal error
4062 self.assertIn("Fatal Python error: could not acquire lock "
4063 "for <_io.BufferedWriter name='<{stream_name}>'> "
4064 "at interpreter shutdown, possibly due to "
4065 "daemon threads".format_map(locals()),
4066 err)
4067 else:
4068 self.assertFalse(err.strip('.!'))
4069
4070 def test_daemon_threads_shutdown_stdout_deadlock(self):
4071 self.check_daemon_threads_shutdown_deadlock('stdout')
4072
4073 def test_daemon_threads_shutdown_stderr_deadlock(self):
4074 self.check_daemon_threads_shutdown_deadlock('stderr')
4075
4076
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004077class PyMiscIOTest(MiscIOTest):
4078 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00004079
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004080
4081@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
4082class SignalsTest(unittest.TestCase):
4083
4084 def setUp(self):
4085 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
4086
4087 def tearDown(self):
4088 signal.signal(signal.SIGALRM, self.oldalrm)
4089
4090 def alarm_interrupt(self, sig, frame):
4091 1/0
4092
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004093 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
4094 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00004095 invokes the signal handler, and bubbles up the exception raised
4096 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004097 read_results = []
4098 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02004099 if hasattr(signal, 'pthread_sigmask'):
4100 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004101 s = os.read(r, 1)
4102 read_results.append(s)
4103 t = threading.Thread(target=_read)
4104 t.daemon = True
4105 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00004106 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01004107 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004108 try:
4109 wio = self.io.open(w, **fdopen_kwargs)
4110 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004111 # Fill the pipe enough that the write will be blocking.
4112 # It will be interrupted by the timer armed above. Since the
4113 # other thread has read one byte, the low-level write will
4114 # return with a successful (partial) result rather than an EINTR.
4115 # The buffered IO layer must check for pending signal
4116 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02004117 signal.alarm(1)
4118 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01004119 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02004120 finally:
4121 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03004122 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004123 # We got one byte, get another one and check that it isn't a
4124 # repeat of the first one.
4125 read_results.append(os.read(r, 1))
4126 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
4127 finally:
4128 os.close(w)
4129 os.close(r)
4130 # This is deliberate. If we didn't close the file descriptor
4131 # before closing wio, wio would try to flush its internal
4132 # buffer, and block again.
4133 try:
4134 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004135 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004136 if e.errno != errno.EBADF:
4137 raise
4138
4139 def test_interrupted_write_unbuffered(self):
4140 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
4141
4142 def test_interrupted_write_buffered(self):
4143 self.check_interrupted_write(b"xy", b"xy", mode="wb")
4144
4145 def test_interrupted_write_text(self):
4146 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
4147
Brett Cannon31f59292011-02-21 19:29:56 +00004148 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004149 def check_reentrant_write(self, data, **fdopen_kwargs):
4150 def on_alarm(*args):
4151 # Will be called reentrantly from the same thread
4152 wio.write(data)
4153 1/0
4154 signal.signal(signal.SIGALRM, on_alarm)
4155 r, w = os.pipe()
4156 wio = self.io.open(w, **fdopen_kwargs)
4157 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004158 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004159 # Either the reentrant call to wio.write() fails with RuntimeError,
4160 # or the signal handler raises ZeroDivisionError.
4161 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4162 while 1:
4163 for i in range(100):
4164 wio.write(data)
4165 wio.flush()
4166 # Make sure the buffer doesn't fill up and block further writes
4167 os.read(r, len(data) * 100)
4168 exc = cm.exception
4169 if isinstance(exc, RuntimeError):
4170 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4171 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004172 signal.alarm(0)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004173 wio.close()
4174 os.close(r)
4175
4176 def test_reentrant_write_buffered(self):
4177 self.check_reentrant_write(b"xy", mode="wb")
4178
4179 def test_reentrant_write_text(self):
4180 self.check_reentrant_write("xy", mode="w", encoding="ascii")
4181
Antoine Pitrou707ce822011-02-25 21:24:11 +00004182 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4183 """Check that a buffered read, when it gets interrupted (either
4184 returning a partial result or EINTR), properly invokes the signal
4185 handler and retries if the latter returned successfully."""
4186 r, w = os.pipe()
4187 fdopen_kwargs["closefd"] = False
4188 def alarm_handler(sig, frame):
4189 os.write(w, b"bar")
4190 signal.signal(signal.SIGALRM, alarm_handler)
4191 try:
4192 rio = self.io.open(r, **fdopen_kwargs)
4193 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004194 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004195 # Expected behaviour:
4196 # - first raw read() returns partial b"foo"
4197 # - second raw read() returns EINTR
4198 # - third raw read() returns b"bar"
4199 self.assertEqual(decode(rio.read(6)), "foobar")
4200 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004201 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004202 rio.close()
4203 os.close(w)
4204 os.close(r)
4205
Antoine Pitrou20db5112011-08-19 20:32:34 +02004206 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004207 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4208 mode="rb")
4209
Antoine Pitrou20db5112011-08-19 20:32:34 +02004210 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004211 self.check_interrupted_read_retry(lambda x: x,
4212 mode="r")
4213
Antoine Pitrou707ce822011-02-25 21:24:11 +00004214 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4215 """Check that a buffered write, when it gets interrupted (either
4216 returning a partial result or EINTR), properly invokes the signal
4217 handler and retries if the latter returned successfully."""
4218 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004219
Antoine Pitrou707ce822011-02-25 21:24:11 +00004220 # A quantity that exceeds the buffer size of an anonymous pipe's
4221 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02004222 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00004223 r, w = os.pipe()
4224 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004225
Antoine Pitrou707ce822011-02-25 21:24:11 +00004226 # We need a separate thread to read from the pipe and allow the
4227 # write() to finish. This thread is started after the SIGALRM is
4228 # received (forcing a first EINTR in write()).
4229 read_results = []
4230 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004231 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00004232 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004233 try:
4234 while not write_finished:
4235 while r in select.select([r], [], [], 1.0)[0]:
4236 s = os.read(r, 1024)
4237 read_results.append(s)
4238 except BaseException as exc:
4239 nonlocal error
4240 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00004241 t = threading.Thread(target=_read)
4242 t.daemon = True
4243 def alarm1(sig, frame):
4244 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004245 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004246 def alarm2(sig, frame):
4247 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004248
4249 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00004250 signal.signal(signal.SIGALRM, alarm1)
4251 try:
4252 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004253 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004254 # Expected behaviour:
4255 # - first raw write() is partial (because of the limited pipe buffer
4256 # and the first alarm)
4257 # - second raw write() returns EINTR (because of the second alarm)
4258 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004259 written = wio.write(large_data)
4260 self.assertEqual(N, written)
4261
Antoine Pitrou707ce822011-02-25 21:24:11 +00004262 wio.flush()
4263 write_finished = True
4264 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004265
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004266 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004267 self.assertEqual(N, sum(len(x) for x in read_results))
4268 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004269 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004270 write_finished = True
4271 os.close(w)
4272 os.close(r)
4273 # This is deliberate. If we didn't close the file descriptor
4274 # before closing wio, wio would try to flush its internal
4275 # buffer, and could block (in case of failure).
4276 try:
4277 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004278 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00004279 if e.errno != errno.EBADF:
4280 raise
4281
Antoine Pitrou20db5112011-08-19 20:32:34 +02004282 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004283 self.check_interrupted_write_retry(b"x", mode="wb")
4284
Antoine Pitrou20db5112011-08-19 20:32:34 +02004285 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004286 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4287
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004288
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004289class CSignalsTest(SignalsTest):
4290 io = io
4291
4292class PySignalsTest(SignalsTest):
4293 io = pyio
4294
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004295 # Handling reentrancy issues would slow down _pyio even more, so the
4296 # tests are disabled.
4297 test_reentrant_write_buffered = None
4298 test_reentrant_write_text = None
4299
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004300
Ezio Melottidaa42c72013-03-23 16:30:16 +02004301def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07004302 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004303 CBufferedReaderTest, PyBufferedReaderTest,
4304 CBufferedWriterTest, PyBufferedWriterTest,
4305 CBufferedRWPairTest, PyBufferedRWPairTest,
4306 CBufferedRandomTest, PyBufferedRandomTest,
4307 StatefulIncrementalDecoderTest,
4308 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4309 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004310 CMiscIOTest, PyMiscIOTest,
4311 CSignalsTest, PySignalsTest,
4312 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004313
4314 # Put the namespaces of the IO module we are testing and some useful mock
4315 # classes in the __dict__ of each test.
4316 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
benfogle9703f092017-11-10 16:03:40 -05004317 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4318 SlowFlushRawIO)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004319 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4320 c_io_ns = {name : getattr(io, name) for name in all_members}
4321 py_io_ns = {name : getattr(pyio, name) for name in all_members}
4322 globs = globals()
4323 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4324 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4325 # Avoid turning open into a bound method.
4326 py_io_ns["open"] = pyio.OpenWrapper
4327 for test in tests:
4328 if test.__name__.startswith("C"):
4329 for name, obj in c_io_ns.items():
4330 setattr(test, name, obj)
4331 elif test.__name__.startswith("Py"):
4332 for name, obj in py_io_ns.items():
4333 setattr(test, name, obj)
4334
Ezio Melottidaa42c72013-03-23 16:30:16 +02004335 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4336 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004337
4338if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004339 unittest.main()