blob: 343c5a444072841e63b67d6378b153d3d2996dee [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020031import threading
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000032import time
Guido van Rossum28524c72007-02-27 05:47:44 +000033import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000034import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020035import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020036from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020037from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000038from test import support
Berker Peksagce643912015-05-06 06:33:17 +030039from test.support.script_helper import assert_python_ok, run_python_until_end
Miss Islington (bot)a13b6542018-03-02 02:17:51 -080040from test.support import FakePath
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000041
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000042import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000043import io # C implementation of io
44import _pyio as pyio # Python implementation of io
Guido van Rossuma9e20242007-03-08 00:43:48 +000045
Martin Panter6bb91f32016-05-28 00:41:57 +000046try:
47 import ctypes
48except ImportError:
49 def byteslike(*pos, **kw):
50 return array.array("b", bytes(*pos, **kw))
51else:
52 def byteslike(*pos, **kw):
53 """Create a bytes-like object having no string or sequence methods"""
54 data = bytes(*pos, **kw)
55 obj = EmptyStruct()
56 ctypes.resize(obj, len(data))
57 memoryview(obj).cast("B")[:] = data
58 return obj
59 class EmptyStruct(ctypes.Structure):
60 pass
61
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000062def _default_chunk_size():
63 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000064 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 return f._CHUNK_SIZE
66
67
Antoine Pitrou328ec742010-09-14 18:37:24 +000068class MockRawIOWithoutRead:
69 """A RawIO implementation without read(), so as to exercise the default
70 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000071
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000072 def __init__(self, read_stack=()):
73 self._read_stack = list(read_stack)
74 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000075 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000076 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000077
Guido van Rossum01a27522007-03-07 01:00:12 +000078 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000079 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000080 return len(b)
81
82 def writable(self):
83 return True
84
Guido van Rossum68bbcd22007-02-27 17:19:33 +000085 def fileno(self):
86 return 42
87
88 def readable(self):
89 return True
90
Guido van Rossum01a27522007-03-07 01:00:12 +000091 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000092 return True
93
Guido van Rossum01a27522007-03-07 01:00:12 +000094 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000095 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000096
97 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000098 return 0 # same comment as above
99
100 def readinto(self, buf):
101 self._reads += 1
102 max_len = len(buf)
103 try:
104 data = self._read_stack[0]
105 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000106 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000107 return 0
108 if data is None:
109 del self._read_stack[0]
110 return None
111 n = len(data)
112 if len(data) <= max_len:
113 del self._read_stack[0]
114 buf[:n] = data
115 return n
116 else:
117 buf[:] = data[:max_len]
118 self._read_stack[0] = data[max_len:]
119 return max_len
120
121 def truncate(self, pos=None):
122 return pos
123
Antoine Pitrou328ec742010-09-14 18:37:24 +0000124class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
125 pass
126
127class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
128 pass
129
130
131class MockRawIO(MockRawIOWithoutRead):
132
133 def read(self, n=None):
134 self._reads += 1
135 try:
136 return self._read_stack.pop(0)
137 except:
138 self._extraneous_reads += 1
139 return b""
140
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000141class CMockRawIO(MockRawIO, io.RawIOBase):
142 pass
143
144class PyMockRawIO(MockRawIO, pyio.RawIOBase):
145 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000146
Guido van Rossuma9e20242007-03-08 00:43:48 +0000147
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000148class MisbehavedRawIO(MockRawIO):
149 def write(self, b):
150 return super().write(b) * 2
151
152 def read(self, n=None):
153 return super().read(n) * 2
154
155 def seek(self, pos, whence):
156 return -123
157
158 def tell(self):
159 return -456
160
161 def readinto(self, buf):
162 super().readinto(buf)
163 return len(buf) * 5
164
165class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
166 pass
167
168class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
169 pass
170
171
benfogle9703f092017-11-10 16:03:40 -0500172class SlowFlushRawIO(MockRawIO):
173 def __init__(self):
174 super().__init__()
175 self.in_flush = threading.Event()
176
177 def flush(self):
178 self.in_flush.set()
179 time.sleep(0.25)
180
181class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
182 pass
183
184class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
185 pass
186
187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188class CloseFailureIO(MockRawIO):
189 closed = 0
190
191 def close(self):
192 if not self.closed:
193 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200194 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000195
196class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
197 pass
198
199class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
200 pass
201
202
203class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000204
205 def __init__(self, data):
206 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000207 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000208
209 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000210 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000211 self.read_history.append(None if res is None else len(res))
212 return res
213
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000214 def readinto(self, b):
215 res = super().readinto(b)
216 self.read_history.append(res)
217 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000218
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000219class CMockFileIO(MockFileIO, io.BytesIO):
220 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000221
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000222class PyMockFileIO(MockFileIO, pyio.BytesIO):
223 pass
224
225
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000226class MockUnseekableIO:
227 def seekable(self):
228 return False
229
230 def seek(self, *args):
231 raise self.UnsupportedOperation("not seekable")
232
233 def tell(self, *args):
234 raise self.UnsupportedOperation("not seekable")
235
Martin Panter754aab22016-03-31 07:21:56 +0000236 def truncate(self, *args):
237 raise self.UnsupportedOperation("not seekable")
238
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000239class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
240 UnsupportedOperation = io.UnsupportedOperation
241
242class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
243 UnsupportedOperation = pyio.UnsupportedOperation
244
245
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000246class MockNonBlockWriterIO:
247
248 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000249 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000250 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000251
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000252 def pop_written(self):
253 s = b"".join(self._write_stack)
254 self._write_stack[:] = []
255 return s
256
257 def block_on(self, char):
258 """Block when a given char is encountered."""
259 self._blocker_char = char
260
261 def readable(self):
262 return True
263
264 def seekable(self):
265 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000266
Guido van Rossum01a27522007-03-07 01:00:12 +0000267 def writable(self):
268 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000269
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000270 def write(self, b):
271 b = bytes(b)
272 n = -1
273 if self._blocker_char:
274 try:
275 n = b.index(self._blocker_char)
276 except ValueError:
277 pass
278 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100279 if n > 0:
280 # write data up to the first blocker
281 self._write_stack.append(b[:n])
282 return n
283 else:
284 # cancel blocker and indicate would block
285 self._blocker_char = None
286 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000287 self._write_stack.append(b)
288 return len(b)
289
290class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
291 BlockingIOError = io.BlockingIOError
292
293class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
294 BlockingIOError = pyio.BlockingIOError
295
Guido van Rossuma9e20242007-03-08 00:43:48 +0000296
Guido van Rossum28524c72007-02-27 05:47:44 +0000297class IOTest(unittest.TestCase):
298
Neal Norwitze7789b12008-03-24 06:18:09 +0000299 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000300 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000301
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000302 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000303 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000304
Guido van Rossum28524c72007-02-27 05:47:44 +0000305 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000306 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000307 f.truncate(0)
308 self.assertEqual(f.tell(), 5)
309 f.seek(0)
310
311 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000312 self.assertEqual(f.seek(0), 0)
313 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000314 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000315 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000316 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000317 buffer = bytearray(b" world\n\n\n")
318 self.assertEqual(f.write(buffer), 9)
319 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000320 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000321 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000322 self.assertEqual(f.seek(-1, 2), 13)
323 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000324
Guido van Rossum87429772007-04-10 21:06:59 +0000325 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000326 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000327 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000328
Guido van Rossum9b76da62007-04-11 01:09:03 +0000329 def read_ops(self, f, buffered=False):
330 data = f.read(5)
331 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000332 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000333 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000334 self.assertEqual(bytes(data), b" worl")
335 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000336 self.assertEqual(f.readinto(data), 2)
337 self.assertEqual(len(data), 5)
338 self.assertEqual(data[:2], b"d\n")
339 self.assertEqual(f.seek(0), 0)
340 self.assertEqual(f.read(20), b"hello world\n")
341 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000342 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000343 self.assertEqual(f.seek(-6, 2), 6)
344 self.assertEqual(f.read(5), b"world")
345 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000346 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000347 self.assertEqual(f.seek(-6, 1), 5)
348 self.assertEqual(f.read(5), b" worl")
349 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000350 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000351 if buffered:
352 f.seek(0)
353 self.assertEqual(f.read(), b"hello world\n")
354 f.seek(6)
355 self.assertEqual(f.read(), b"world\n")
356 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000357 f.seek(0)
358 data = byteslike(5)
359 self.assertEqual(f.readinto1(data), 5)
360 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000361
Guido van Rossum34d69e52007-04-10 20:08:41 +0000362 LARGE = 2**31
363
Guido van Rossum53807da2007-04-10 19:01:47 +0000364 def large_file_ops(self, f):
365 assert f.readable()
366 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100367 try:
368 self.assertEqual(f.seek(self.LARGE), self.LARGE)
369 except (OverflowError, ValueError):
370 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000371 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000372 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000373 self.assertEqual(f.tell(), self.LARGE + 3)
374 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000375 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000376 self.assertEqual(f.tell(), self.LARGE + 2)
377 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000378 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000379 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000380 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
381 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000382 self.assertEqual(f.read(2), b"x")
383
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000384 def test_invalid_operations(self):
385 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000386 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000387 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000388 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000389 self.assertRaises(exc, fp.read)
390 self.assertRaises(exc, fp.readline)
391 with self.open(support.TESTFN, "wb", buffering=0) as fp:
392 self.assertRaises(exc, fp.read)
393 self.assertRaises(exc, fp.readline)
394 with self.open(support.TESTFN, "rb", buffering=0) as fp:
395 self.assertRaises(exc, fp.write, b"blah")
396 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000397 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000398 self.assertRaises(exc, fp.write, b"blah")
399 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000400 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000401 self.assertRaises(exc, fp.write, "blah")
402 self.assertRaises(exc, fp.writelines, ["blah\n"])
403 # Non-zero seeking from current or end pos
404 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
405 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000406
Martin Panter754aab22016-03-31 07:21:56 +0000407 def test_optional_abilities(self):
408 # Test for OSError when optional APIs are not supported
409 # The purpose of this test is to try fileno(), reading, writing and
410 # seeking operations with various objects that indicate they do not
411 # support these operations.
412
413 def pipe_reader():
414 [r, w] = os.pipe()
415 os.close(w) # So that read() is harmless
416 return self.FileIO(r, "r")
417
418 def pipe_writer():
419 [r, w] = os.pipe()
420 self.addCleanup(os.close, r)
421 # Guarantee that we can write into the pipe without blocking
422 thread = threading.Thread(target=os.read, args=(r, 100))
423 thread.start()
424 self.addCleanup(thread.join)
425 return self.FileIO(w, "w")
426
427 def buffered_reader():
428 return self.BufferedReader(self.MockUnseekableIO())
429
430 def buffered_writer():
431 return self.BufferedWriter(self.MockUnseekableIO())
432
433 def buffered_random():
434 return self.BufferedRandom(self.BytesIO())
435
436 def buffered_rw_pair():
437 return self.BufferedRWPair(self.MockUnseekableIO(),
438 self.MockUnseekableIO())
439
440 def text_reader():
441 class UnseekableReader(self.MockUnseekableIO):
442 writable = self.BufferedIOBase.writable
443 write = self.BufferedIOBase.write
444 return self.TextIOWrapper(UnseekableReader(), "ascii")
445
446 def text_writer():
447 class UnseekableWriter(self.MockUnseekableIO):
448 readable = self.BufferedIOBase.readable
449 read = self.BufferedIOBase.read
450 return self.TextIOWrapper(UnseekableWriter(), "ascii")
451
452 tests = (
453 (pipe_reader, "fr"), (pipe_writer, "fw"),
454 (buffered_reader, "r"), (buffered_writer, "w"),
455 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
456 (text_reader, "r"), (text_writer, "w"),
457 (self.BytesIO, "rws"), (self.StringIO, "rws"),
458 )
459 for [test, abilities] in tests:
Martin Panter754aab22016-03-31 07:21:56 +0000460 with self.subTest(test), test() as obj:
461 readable = "r" in abilities
462 self.assertEqual(obj.readable(), readable)
463 writable = "w" in abilities
464 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000465
466 if isinstance(obj, self.TextIOBase):
467 data = "3"
468 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
469 data = b"3"
470 else:
471 self.fail("Unknown base class")
472
473 if "f" in abilities:
474 obj.fileno()
475 else:
476 self.assertRaises(OSError, obj.fileno)
477
478 if readable:
479 obj.read(1)
480 obj.read()
481 else:
482 self.assertRaises(OSError, obj.read, 1)
483 self.assertRaises(OSError, obj.read)
484
485 if writable:
486 obj.write(data)
487 else:
488 self.assertRaises(OSError, obj.write, data)
489
Martin Panter3ee147f2016-03-31 21:05:31 +0000490 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000491 pipe_reader, pipe_writer):
492 # Pipes seem to appear as seekable on Windows
493 continue
494 seekable = "s" in abilities
495 self.assertEqual(obj.seekable(), seekable)
496
Martin Panter754aab22016-03-31 07:21:56 +0000497 if seekable:
498 obj.tell()
499 obj.seek(0)
500 else:
501 self.assertRaises(OSError, obj.tell)
502 self.assertRaises(OSError, obj.seek, 0)
503
504 if writable and seekable:
505 obj.truncate()
506 obj.truncate(0)
507 else:
508 self.assertRaises(OSError, obj.truncate)
509 self.assertRaises(OSError, obj.truncate, 0)
510
Antoine Pitrou13348842012-01-29 18:36:34 +0100511 def test_open_handles_NUL_chars(self):
512 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300513 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100514
515 bytes_fn = bytes(fn_with_NUL, 'ascii')
516 with warnings.catch_warnings():
517 warnings.simplefilter("ignore", DeprecationWarning)
518 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100519
Guido van Rossum28524c72007-02-27 05:47:44 +0000520 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000521 with self.open(support.TESTFN, "wb", buffering=0) as f:
522 self.assertEqual(f.readable(), False)
523 self.assertEqual(f.writable(), True)
524 self.assertEqual(f.seekable(), True)
525 self.write_ops(f)
526 with self.open(support.TESTFN, "rb", buffering=0) as f:
527 self.assertEqual(f.readable(), True)
528 self.assertEqual(f.writable(), False)
529 self.assertEqual(f.seekable(), True)
530 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000531
Guido van Rossum87429772007-04-10 21:06:59 +0000532 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000533 with self.open(support.TESTFN, "wb") as f:
534 self.assertEqual(f.readable(), False)
535 self.assertEqual(f.writable(), True)
536 self.assertEqual(f.seekable(), True)
537 self.write_ops(f)
538 with self.open(support.TESTFN, "rb") as f:
539 self.assertEqual(f.readable(), True)
540 self.assertEqual(f.writable(), False)
541 self.assertEqual(f.seekable(), True)
542 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000543
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000544 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000545 with self.open(support.TESTFN, "wb") as f:
546 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
547 with self.open(support.TESTFN, "rb") as f:
548 self.assertEqual(f.readline(), b"abc\n")
549 self.assertEqual(f.readline(10), b"def\n")
550 self.assertEqual(f.readline(2), b"xy")
551 self.assertEqual(f.readline(4), b"zzy\n")
552 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000553 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000554 self.assertRaises(TypeError, f.readline, 5.3)
555 with self.open(support.TESTFN, "r") as f:
556 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000557
Serhiy Storchakabf623ae2017-04-19 20:03:52 +0300558 def test_readline_nonsizeable(self):
559 # Issue #30061
560 # Crash when readline() returns an object without __len__
561 class R(self.IOBase):
562 def readline(self):
563 return None
564 self.assertRaises((TypeError, StopIteration), next, R())
565
566 def test_next_nonsizeable(self):
567 # Issue #30061
568 # Crash when __next__() returns an object without __len__
569 class R(self.IOBase):
570 def __next__(self):
571 return None
572 self.assertRaises(TypeError, R().readlines, 1)
573
Guido van Rossum28524c72007-02-27 05:47:44 +0000574 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000575 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000576 self.write_ops(f)
577 data = f.getvalue()
578 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000579 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000580 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000581
Guido van Rossum53807da2007-04-10 19:01:47 +0000582 def test_large_file_ops(self):
Mike53f7a7c2017-12-14 14:04:53 +0300583 # On Windows and Mac OSX this test consumes large resources; It takes
Victor Stinner8c663fd2017-11-08 14:44:44 -0800584 # a long time to build the >2 GiB file and takes >2 GiB of disk space
Guido van Rossum34d69e52007-04-10 20:08:41 +0000585 # therefore the resource must be enabled to run this test.
586 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600587 support.requires(
588 'largefile',
589 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000590 with self.open(support.TESTFN, "w+b", 0) as f:
591 self.large_file_ops(f)
592 with self.open(support.TESTFN, "w+b") as f:
593 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000594
595 def test_with_open(self):
596 for bufsize in (0, 1, 100):
597 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000598 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000599 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000600 self.assertEqual(f.closed, True)
601 f = None
602 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000603 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000604 1/0
605 except ZeroDivisionError:
606 self.assertEqual(f.closed, True)
607 else:
608 self.fail("1/0 didn't raise an exception")
609
Antoine Pitrou08838b62009-01-21 00:55:13 +0000610 # issue 5008
611 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000612 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000613 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000614 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000615 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000616 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000617 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000618 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300619 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000620
Guido van Rossum87429772007-04-10 21:06:59 +0000621 def test_destructor(self):
622 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000623 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000624 def __del__(self):
625 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000626 try:
627 f = super().__del__
628 except AttributeError:
629 pass
630 else:
631 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000632 def close(self):
633 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000634 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000635 def flush(self):
636 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000637 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000638 with support.check_warnings(('', ResourceWarning)):
639 f = MyFileIO(support.TESTFN, "wb")
640 f.write(b"xxx")
641 del f
642 support.gc_collect()
643 self.assertEqual(record, [1, 2, 3])
644 with self.open(support.TESTFN, "rb") as f:
645 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000646
647 def _check_base_destructor(self, base):
648 record = []
649 class MyIO(base):
650 def __init__(self):
651 # This exercises the availability of attributes on object
652 # destruction.
653 # (in the C version, close() is called by the tp_dealloc
654 # function, not by __del__)
655 self.on_del = 1
656 self.on_close = 2
657 self.on_flush = 3
658 def __del__(self):
659 record.append(self.on_del)
660 try:
661 f = super().__del__
662 except AttributeError:
663 pass
664 else:
665 f()
666 def close(self):
667 record.append(self.on_close)
668 super().close()
669 def flush(self):
670 record.append(self.on_flush)
671 super().flush()
672 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000673 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000674 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000675 self.assertEqual(record, [1, 2, 3])
676
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000677 def test_IOBase_destructor(self):
678 self._check_base_destructor(self.IOBase)
679
680 def test_RawIOBase_destructor(self):
681 self._check_base_destructor(self.RawIOBase)
682
683 def test_BufferedIOBase_destructor(self):
684 self._check_base_destructor(self.BufferedIOBase)
685
686 def test_TextIOBase_destructor(self):
687 self._check_base_destructor(self.TextIOBase)
688
Guido van Rossum87429772007-04-10 21:06:59 +0000689 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000690 with self.open(support.TESTFN, "wb") as f:
691 f.write(b"xxx")
692 with self.open(support.TESTFN, "rb") as f:
693 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000694
Guido van Rossumd4103952007-04-12 05:44:49 +0000695 def test_array_writes(self):
696 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000697 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000698 def check(f):
699 with f:
700 self.assertEqual(f.write(a), n)
701 f.writelines((a,))
702 check(self.BytesIO())
703 check(self.FileIO(support.TESTFN, "w"))
704 check(self.BufferedWriter(self.MockRawIO()))
705 check(self.BufferedRandom(self.MockRawIO()))
706 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000707
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000708 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000709 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000710 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000711
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000712 def test_read_closed(self):
713 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000714 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000715 with self.open(support.TESTFN, "r") as f:
716 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000717 self.assertEqual(file.read(), "egg\n")
718 file.seek(0)
719 file.close()
720 self.assertRaises(ValueError, file.read)
721
722 def test_no_closefd_with_filename(self):
723 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000724 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000725
726 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000727 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000728 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000729 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000730 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000731 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000732 self.assertEqual(file.buffer.raw.closefd, False)
733
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000734 def test_garbage_collection(self):
735 # FileIO objects are collected, and collecting them flushes
736 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000737 with support.check_warnings(('', ResourceWarning)):
738 f = self.FileIO(support.TESTFN, "wb")
739 f.write(b"abcxxx")
740 f.f = f
741 wr = weakref.ref(f)
742 del f
743 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300744 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000745 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000746 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000747
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000748 def test_unbounded_file(self):
749 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
750 zero = "/dev/zero"
751 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000752 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000753 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000754 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000755 if support.real_max_memuse < support._2G:
Victor Stinner8c663fd2017-11-08 14:44:44 -0800756 self.skipTest("test requires at least 2 GiB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000757 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000758 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000759 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000760 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000761 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000762 self.assertRaises(OverflowError, f.read)
763
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200764 def check_flush_error_on_close(self, *args, **kwargs):
765 # Test that the file is closed despite failed flush
766 # and that flush() is called before file closed.
767 f = self.open(*args, **kwargs)
768 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000769 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200770 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200771 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000772 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200773 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600774 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200775 self.assertTrue(closed) # flush() called
776 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200777 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200778
779 def test_flush_error_on_close(self):
780 # raw file
781 # Issue #5700: io.FileIO calls flush() after file closed
782 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
783 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
784 self.check_flush_error_on_close(fd, 'wb', buffering=0)
785 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
786 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
787 os.close(fd)
788 # buffered io
789 self.check_flush_error_on_close(support.TESTFN, 'wb')
790 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
791 self.check_flush_error_on_close(fd, 'wb')
792 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
793 self.check_flush_error_on_close(fd, 'wb', closefd=False)
794 os.close(fd)
795 # text io
796 self.check_flush_error_on_close(support.TESTFN, 'w')
797 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
798 self.check_flush_error_on_close(fd, 'w')
799 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
800 self.check_flush_error_on_close(fd, 'w', closefd=False)
801 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000802
803 def test_multi_close(self):
804 f = self.open(support.TESTFN, "wb", buffering=0)
805 f.close()
806 f.close()
807 f.close()
808 self.assertRaises(ValueError, f.flush)
809
Antoine Pitrou328ec742010-09-14 18:37:24 +0000810 def test_RawIOBase_read(self):
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530811 # Exercise the default limited RawIOBase.read(n) implementation (which
812 # calls readinto() internally).
Antoine Pitrou328ec742010-09-14 18:37:24 +0000813 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
814 self.assertEqual(rawio.read(2), b"ab")
815 self.assertEqual(rawio.read(2), b"c")
816 self.assertEqual(rawio.read(2), b"d")
817 self.assertEqual(rawio.read(2), None)
818 self.assertEqual(rawio.read(2), b"ef")
819 self.assertEqual(rawio.read(2), b"g")
820 self.assertEqual(rawio.read(2), None)
821 self.assertEqual(rawio.read(2), b"")
822
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400823 def test_types_have_dict(self):
824 test = (
825 self.IOBase(),
826 self.RawIOBase(),
827 self.TextIOBase(),
828 self.StringIO(),
829 self.BytesIO()
830 )
831 for obj in test:
832 self.assertTrue(hasattr(obj, "__dict__"))
833
Ross Lagerwall59142db2011-10-31 20:34:46 +0200834 def test_opener(self):
835 with self.open(support.TESTFN, "w") as f:
836 f.write("egg\n")
837 fd = os.open(support.TESTFN, os.O_RDONLY)
838 def opener(path, flags):
839 return fd
840 with self.open("non-existent", "r", opener=opener) as f:
841 self.assertEqual(f.read(), "egg\n")
842
Barry Warsaw480e2852016-06-08 17:47:26 -0400843 def test_bad_opener_negative_1(self):
844 # Issue #27066.
845 def badopener(fname, flags):
846 return -1
847 with self.assertRaises(ValueError) as cm:
848 open('non-existent', 'r', opener=badopener)
849 self.assertEqual(str(cm.exception), 'opener returned -1')
850
851 def test_bad_opener_other_negative(self):
852 # Issue #27066.
853 def badopener(fname, flags):
854 return -2
855 with self.assertRaises(ValueError) as cm:
856 open('non-existent', 'r', opener=badopener)
857 self.assertEqual(str(cm.exception), 'opener returned -2')
858
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200859 def test_fileio_closefd(self):
860 # Issue #4841
861 with self.open(__file__, 'rb') as f1, \
862 self.open(__file__, 'rb') as f2:
863 fileio = self.FileIO(f1.fileno(), closefd=False)
864 # .__init__() must not close f1
865 fileio.__init__(f2.fileno(), closefd=False)
866 f1.readline()
867 # .close() must not close f2
868 fileio.close()
869 f2.readline()
870
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300871 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200872 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300873 with self.assertRaises(ValueError):
874 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300875
876 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200877 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300878 with self.assertRaises(ValueError):
879 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300880
Martin Panter6bb91f32016-05-28 00:41:57 +0000881 def test_buffered_readinto_mixin(self):
882 # Test the implementation provided by BufferedIOBase
883 class Stream(self.BufferedIOBase):
884 def read(self, size):
885 return b"12345"
886 read1 = read
887 stream = Stream()
888 for method in ("readinto", "readinto1"):
889 with self.subTest(method):
890 buffer = byteslike(5)
891 self.assertEqual(getattr(stream, method)(buffer), 5)
892 self.assertEqual(bytes(buffer), b"12345")
893
Ethan Furmand62548a2016-06-04 14:38:43 -0700894 def test_fspath_support(self):
Ethan Furmand62548a2016-06-04 14:38:43 -0700895 def check_path_succeeds(path):
896 with self.open(path, "w") as f:
897 f.write("egg\n")
898
899 with self.open(path, "r") as f:
900 self.assertEqual(f.read(), "egg\n")
901
Miss Islington (bot)a13b6542018-03-02 02:17:51 -0800902 check_path_succeeds(FakePath(support.TESTFN))
903 check_path_succeeds(FakePath(support.TESTFN.encode('utf-8')))
Ethan Furmand62548a2016-06-04 14:38:43 -0700904
Miss Islington (bot)a13b6542018-03-02 02:17:51 -0800905 with self.open(support.TESTFN, "w") as f:
906 bad_path = FakePath(f.fileno())
907 with self.assertRaises(TypeError):
908 self.open(bad_path, 'w')
909
910 bad_path = FakePath(None)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700911 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700912 self.open(bad_path, 'w')
913
Miss Islington (bot)a13b6542018-03-02 02:17:51 -0800914 bad_path = FakePath(FloatingPointError)
915 with self.assertRaises(FloatingPointError):
916 self.open(bad_path, 'w')
917
Ethan Furmand62548a2016-06-04 14:38:43 -0700918 # ensure that refcounting is correct with some error conditions
919 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
Miss Islington (bot)a13b6542018-03-02 02:17:51 -0800920 self.open(FakePath(support.TESTFN), 'rwxa')
Ethan Furmand62548a2016-06-04 14:38:43 -0700921
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530922 def test_RawIOBase_readall(self):
923 # Exercise the default unlimited RawIOBase.read() and readall()
924 # implementations.
925 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
926 self.assertEqual(rawio.read(), b"abcdefg")
927 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
928 self.assertEqual(rawio.readall(), b"abcdefg")
929
930 def test_BufferedIOBase_readinto(self):
931 # Exercise the default BufferedIOBase.readinto() and readinto1()
932 # implementations (which call read() or read1() internally).
933 class Reader(self.BufferedIOBase):
934 def __init__(self, avail):
935 self.avail = avail
936 def read(self, size):
937 result = self.avail[:size]
938 self.avail = self.avail[size:]
939 return result
940 def read1(self, size):
941 """Returns no more than 5 bytes at once"""
942 return self.read(min(size, 5))
943 tests = (
944 # (test method, total data available, read buffer size, expected
945 # read size)
946 ("readinto", 10, 5, 5),
947 ("readinto", 10, 6, 6), # More than read1() can return
948 ("readinto", 5, 6, 5), # Buffer larger than total available
949 ("readinto", 6, 7, 6),
950 ("readinto", 10, 0, 0), # Empty buffer
951 ("readinto1", 10, 5, 5), # Result limited to single read1() call
952 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return
953 ("readinto1", 5, 6, 5), # Buffer larger than total available
954 ("readinto1", 6, 7, 5),
955 ("readinto1", 10, 0, 0), # Empty buffer
956 )
957 UNUSED_BYTE = 0x81
958 for test in tests:
959 with self.subTest(test):
960 method, avail, request, result = test
961 reader = Reader(bytes(range(avail)))
962 buffer = bytearray((UNUSED_BYTE,) * request)
963 method = getattr(reader, method)
964 self.assertEqual(method(buffer), result)
965 self.assertEqual(len(buffer), request)
966 self.assertSequenceEqual(buffer[:result], range(result))
967 unused = (UNUSED_BYTE,) * (request - result)
968 self.assertSequenceEqual(buffer[result:], unused)
969 self.assertEqual(len(reader.avail), avail - result)
970
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200971
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000972class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200973
974 def test_IOBase_finalize(self):
975 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
976 # class which inherits IOBase and an object of this class are caught
977 # in a reference cycle and close() is already in the method cache.
978 class MyIO(self.IOBase):
979 def close(self):
980 pass
981
982 # create an instance to populate the method cache
983 MyIO()
984 obj = MyIO()
985 obj.obj = obj
986 wr = weakref.ref(obj)
987 del MyIO
988 del obj
989 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300990 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000991
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000992class PyIOTest(IOTest):
993 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000994
Guido van Rossuma9e20242007-03-08 00:43:48 +0000995
Gregory P. Smith1bef9072015-04-14 13:24:34 -0700996@support.cpython_only
997class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -0700998
Gregory P. Smith054b0652015-04-14 12:58:05 -0700999 def test_RawIOBase_io_in_pyio_match(self):
1000 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +02001001 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
1002 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -07001003 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1004
1005 def test_RawIOBase_pyio_in_io_match(self):
1006 """Test that c RawIOBase class has all pyio RawIOBase methods"""
1007 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1008 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1009
1010
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001011class CommonBufferedTests:
1012 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1013
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001014 def test_detach(self):
1015 raw = self.MockRawIO()
1016 buf = self.tp(raw)
1017 self.assertIs(buf.detach(), raw)
1018 self.assertRaises(ValueError, buf.detach)
1019
Benjamin Peterson10e76b62014-12-21 20:51:50 -06001020 repr(buf) # Should still work
1021
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001022 def test_fileno(self):
1023 rawio = self.MockRawIO()
1024 bufio = self.tp(rawio)
1025
Ezio Melottib3aedd42010-11-20 19:04:17 +00001026 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001027
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001028 def test_invalid_args(self):
1029 rawio = self.MockRawIO()
1030 bufio = self.tp(rawio)
1031 # Invalid whence
1032 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +02001033 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001034
1035 def test_override_destructor(self):
1036 tp = self.tp
1037 record = []
1038 class MyBufferedIO(tp):
1039 def __del__(self):
1040 record.append(1)
1041 try:
1042 f = super().__del__
1043 except AttributeError:
1044 pass
1045 else:
1046 f()
1047 def close(self):
1048 record.append(2)
1049 super().close()
1050 def flush(self):
1051 record.append(3)
1052 super().flush()
1053 rawio = self.MockRawIO()
1054 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001055 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001056 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +00001057 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001058
1059 def test_context_manager(self):
1060 # Test usability as a context manager
1061 rawio = self.MockRawIO()
1062 bufio = self.tp(rawio)
1063 def _with():
1064 with bufio:
1065 pass
1066 _with()
1067 # bufio should now be closed, and using it a second time should raise
1068 # a ValueError.
1069 self.assertRaises(ValueError, _with)
1070
1071 def test_error_through_destructor(self):
1072 # Test that the exception state is not modified by a destructor,
1073 # even if close() fails.
1074 rawio = self.CloseFailureIO()
1075 def f():
1076 self.tp(rawio).xyzzy
1077 with support.captured_output("stderr") as s:
1078 self.assertRaises(AttributeError, f)
1079 s = s.getvalue().strip()
1080 if s:
1081 # The destructor *may* have printed an unraisable error, check it
1082 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001083 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001084 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +00001085
Antoine Pitrou716c4442009-05-23 19:04:03 +00001086 def test_repr(self):
1087 raw = self.MockRawIO()
1088 b = self.tp(raw)
Serhiy Storchaka521e5862014-07-22 15:00:37 +03001089 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001090 self.assertEqual(repr(b), "<%s>" % clsname)
1091 raw.name = "dummy"
1092 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
1093 raw.name = b"dummy"
1094 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
1095
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02001096 def test_recursive_repr(self):
1097 # Issue #25455
1098 raw = self.MockRawIO()
1099 b = self.tp(raw)
1100 with support.swap_attr(raw, 'name', b):
1101 try:
1102 repr(b) # Should not crash
1103 except RuntimeError:
1104 pass
1105
Antoine Pitrou6be88762010-05-03 16:48:20 +00001106 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001107 # Test that buffered file is closed despite failed flush
1108 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001109 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001110 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001111 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001112 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001113 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001114 raw.flush = bad_flush
1115 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001116 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001117 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001118 self.assertTrue(raw.closed)
1119 self.assertTrue(closed) # flush() called
1120 self.assertFalse(closed[0]) # flush() called before file closed
1121 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001122 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001123
1124 def test_close_error_on_close(self):
1125 raw = self.MockRawIO()
1126 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001127 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001128 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001129 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001130 raw.close = bad_close
1131 b = self.tp(raw)
1132 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001133 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001134 b.close()
1135 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001136 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001137 self.assertEqual(err.exception.__context__.args, ('flush',))
1138 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001139
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001140 def test_nonnormalized_close_error_on_close(self):
1141 # Issue #21677
1142 raw = self.MockRawIO()
1143 def bad_flush():
1144 raise non_existing_flush
1145 def bad_close():
1146 raise non_existing_close
1147 raw.close = bad_close
1148 b = self.tp(raw)
1149 b.flush = bad_flush
1150 with self.assertRaises(NameError) as err: # exception not swallowed
1151 b.close()
1152 self.assertIn('non_existing_close', str(err.exception))
1153 self.assertIsInstance(err.exception.__context__, NameError)
1154 self.assertIn('non_existing_flush', str(err.exception.__context__))
1155 self.assertFalse(b.closed)
1156
Antoine Pitrou6be88762010-05-03 16:48:20 +00001157 def test_multi_close(self):
1158 raw = self.MockRawIO()
1159 b = self.tp(raw)
1160 b.close()
1161 b.close()
1162 b.close()
1163 self.assertRaises(ValueError, b.flush)
1164
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001165 def test_unseekable(self):
1166 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1167 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1168 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1169
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001170 def test_readonly_attributes(self):
1171 raw = self.MockRawIO()
1172 buf = self.tp(raw)
1173 x = self.MockRawIO()
1174 with self.assertRaises(AttributeError):
1175 buf.raw = x
1176
Guido van Rossum78892e42007-04-06 17:31:18 +00001177
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001178class SizeofTest:
1179
1180 @support.cpython_only
1181 def test_sizeof(self):
1182 bufsize1 = 4096
1183 bufsize2 = 8192
1184 rawio = self.MockRawIO()
1185 bufio = self.tp(rawio, buffer_size=bufsize1)
1186 size = sys.getsizeof(bufio) - bufsize1
1187 rawio = self.MockRawIO()
1188 bufio = self.tp(rawio, buffer_size=bufsize2)
1189 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1190
Jesus Ceadc469452012-10-04 12:37:56 +02001191 @support.cpython_only
1192 def test_buffer_freeing(self) :
1193 bufsize = 4096
1194 rawio = self.MockRawIO()
1195 bufio = self.tp(rawio, buffer_size=bufsize)
1196 size = sys.getsizeof(bufio) - bufsize
1197 bufio.close()
1198 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001200class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1201 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001202
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001203 def test_constructor(self):
1204 rawio = self.MockRawIO([b"abc"])
1205 bufio = self.tp(rawio)
1206 bufio.__init__(rawio)
1207 bufio.__init__(rawio, buffer_size=1024)
1208 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001209 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001210 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1211 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1212 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1213 rawio = self.MockRawIO([b"abc"])
1214 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001215 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001216
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001217 def test_uninitialized(self):
1218 bufio = self.tp.__new__(self.tp)
1219 del bufio
1220 bufio = self.tp.__new__(self.tp)
1221 self.assertRaisesRegex((ValueError, AttributeError),
1222 'uninitialized|has no attribute',
1223 bufio.read, 0)
1224 bufio.__init__(self.MockRawIO())
1225 self.assertEqual(bufio.read(0), b'')
1226
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001227 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001228 for arg in (None, 7):
1229 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1230 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001231 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001232 # Invalid args
1233 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001234
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001235 def test_read1(self):
1236 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1237 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001238 self.assertEqual(b"a", bufio.read(1))
1239 self.assertEqual(b"b", bufio.read1(1))
1240 self.assertEqual(rawio._reads, 1)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001241 self.assertEqual(b"", bufio.read1(0))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001242 self.assertEqual(b"c", bufio.read1(100))
1243 self.assertEqual(rawio._reads, 1)
1244 self.assertEqual(b"d", bufio.read1(100))
1245 self.assertEqual(rawio._reads, 2)
1246 self.assertEqual(b"efg", bufio.read1(100))
1247 self.assertEqual(rawio._reads, 3)
1248 self.assertEqual(b"", bufio.read1(100))
1249 self.assertEqual(rawio._reads, 4)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001250
1251 def test_read1_arbitrary(self):
1252 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1253 bufio = self.tp(rawio)
1254 self.assertEqual(b"a", bufio.read(1))
1255 self.assertEqual(b"bc", bufio.read1())
1256 self.assertEqual(b"d", bufio.read1())
1257 self.assertEqual(b"efg", bufio.read1(-1))
1258 self.assertEqual(rawio._reads, 3)
1259 self.assertEqual(b"", bufio.read1())
1260 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001261
1262 def test_readinto(self):
1263 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1264 bufio = self.tp(rawio)
1265 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001266 self.assertEqual(bufio.readinto(b), 2)
1267 self.assertEqual(b, b"ab")
1268 self.assertEqual(bufio.readinto(b), 2)
1269 self.assertEqual(b, b"cd")
1270 self.assertEqual(bufio.readinto(b), 2)
1271 self.assertEqual(b, b"ef")
1272 self.assertEqual(bufio.readinto(b), 1)
1273 self.assertEqual(b, b"gf")
1274 self.assertEqual(bufio.readinto(b), 0)
1275 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001276 rawio = self.MockRawIO((b"abc", None))
1277 bufio = self.tp(rawio)
1278 self.assertEqual(bufio.readinto(b), 2)
1279 self.assertEqual(b, b"ab")
1280 self.assertEqual(bufio.readinto(b), 1)
1281 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001282
Benjamin Petersona96fea02014-06-22 14:17:44 -07001283 def test_readinto1(self):
1284 buffer_size = 10
1285 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1286 bufio = self.tp(rawio, buffer_size=buffer_size)
1287 b = bytearray(2)
1288 self.assertEqual(bufio.peek(3), b'abc')
1289 self.assertEqual(rawio._reads, 1)
1290 self.assertEqual(bufio.readinto1(b), 2)
1291 self.assertEqual(b, b"ab")
1292 self.assertEqual(rawio._reads, 1)
1293 self.assertEqual(bufio.readinto1(b), 1)
1294 self.assertEqual(b[:1], b"c")
1295 self.assertEqual(rawio._reads, 1)
1296 self.assertEqual(bufio.readinto1(b), 2)
1297 self.assertEqual(b, b"de")
1298 self.assertEqual(rawio._reads, 2)
1299 b = bytearray(2*buffer_size)
1300 self.assertEqual(bufio.peek(3), b'fgh')
1301 self.assertEqual(rawio._reads, 3)
1302 self.assertEqual(bufio.readinto1(b), 6)
1303 self.assertEqual(b[:6], b"fghjkl")
1304 self.assertEqual(rawio._reads, 4)
1305
1306 def test_readinto_array(self):
1307 buffer_size = 60
1308 data = b"a" * 26
1309 rawio = self.MockRawIO((data,))
1310 bufio = self.tp(rawio, buffer_size=buffer_size)
1311
1312 # Create an array with element size > 1 byte
1313 b = array.array('i', b'x' * 32)
1314 assert len(b) != 16
1315
1316 # Read into it. We should get as many *bytes* as we can fit into b
1317 # (which is more than the number of elements)
1318 n = bufio.readinto(b)
1319 self.assertGreater(n, len(b))
1320
1321 # Check that old contents of b are preserved
1322 bm = memoryview(b).cast('B')
1323 self.assertLess(n, len(bm))
1324 self.assertEqual(bm[:n], data[:n])
1325 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1326
1327 def test_readinto1_array(self):
1328 buffer_size = 60
1329 data = b"a" * 26
1330 rawio = self.MockRawIO((data,))
1331 bufio = self.tp(rawio, buffer_size=buffer_size)
1332
1333 # Create an array with element size > 1 byte
1334 b = array.array('i', b'x' * 32)
1335 assert len(b) != 16
1336
1337 # Read into it. We should get as many *bytes* as we can fit into b
1338 # (which is more than the number of elements)
1339 n = bufio.readinto1(b)
1340 self.assertGreater(n, len(b))
1341
1342 # Check that old contents of b are preserved
1343 bm = memoryview(b).cast('B')
1344 self.assertLess(n, len(bm))
1345 self.assertEqual(bm[:n], data[:n])
1346 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1347
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001348 def test_readlines(self):
1349 def bufio():
1350 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1351 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001352 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1353 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1354 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001355
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001356 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001357 data = b"abcdefghi"
1358 dlen = len(data)
1359
1360 tests = [
1361 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1362 [ 100, [ 3, 3, 3], [ dlen ] ],
1363 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1364 ]
1365
1366 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001367 rawio = self.MockFileIO(data)
1368 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001369 pos = 0
1370 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001371 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001372 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001373 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001374 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001375
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001376 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001377 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001378 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1379 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001380 self.assertEqual(b"abcd", bufio.read(6))
1381 self.assertEqual(b"e", bufio.read(1))
1382 self.assertEqual(b"fg", bufio.read())
1383 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001384 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001385 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001386
Victor Stinnera80987f2011-05-25 22:47:16 +02001387 rawio = self.MockRawIO((b"a", None, None))
1388 self.assertEqual(b"a", rawio.readall())
1389 self.assertIsNone(rawio.readall())
1390
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001391 def test_read_past_eof(self):
1392 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1393 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001394
Ezio Melottib3aedd42010-11-20 19:04:17 +00001395 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001396
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001397 def test_read_all(self):
1398 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1399 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001400
Ezio Melottib3aedd42010-11-20 19:04:17 +00001401 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001402
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001403 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001404 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001405 try:
1406 # Write out many bytes with exactly the same number of 0's,
1407 # 1's... 255's. This will help us check that concurrent reading
1408 # doesn't duplicate or forget contents.
1409 N = 1000
1410 l = list(range(256)) * N
1411 random.shuffle(l)
1412 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001413 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001414 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001415 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001416 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001417 errors = []
1418 results = []
1419 def f():
1420 try:
1421 # Intra-buffer read then buffer-flushing read
1422 for n in cycle([1, 19]):
1423 s = bufio.read(n)
1424 if not s:
1425 break
1426 # list.append() is atomic
1427 results.append(s)
1428 except Exception as e:
1429 errors.append(e)
1430 raise
1431 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001432 with support.start_threads(threads):
1433 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001434 self.assertFalse(errors,
1435 "the following exceptions were caught: %r" % errors)
1436 s = b''.join(results)
1437 for i in range(256):
1438 c = bytes(bytearray([i]))
1439 self.assertEqual(s.count(c), N)
1440 finally:
1441 support.unlink(support.TESTFN)
1442
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001443 def test_unseekable(self):
1444 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1445 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1446 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1447 bufio.read(1)
1448 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1449 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1450
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001451 def test_misbehaved_io(self):
1452 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1453 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001454 self.assertRaises(OSError, bufio.seek, 0)
1455 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001456
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001457 def test_no_extraneous_read(self):
1458 # Issue #9550; when the raw IO object has satisfied the read request,
1459 # we should not issue any additional reads, otherwise it may block
1460 # (e.g. socket).
1461 bufsize = 16
1462 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1463 rawio = self.MockRawIO([b"x" * n])
1464 bufio = self.tp(rawio, bufsize)
1465 self.assertEqual(bufio.read(n), b"x" * n)
1466 # Simple case: one raw read is enough to satisfy the request.
1467 self.assertEqual(rawio._extraneous_reads, 0,
1468 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1469 # A more complex case where two raw reads are needed to satisfy
1470 # the request.
1471 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1472 bufio = self.tp(rawio, bufsize)
1473 self.assertEqual(bufio.read(n), b"x" * n)
1474 self.assertEqual(rawio._extraneous_reads, 0,
1475 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1476
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001477 def test_read_on_closed(self):
1478 # Issue #23796
1479 b = io.BufferedReader(io.BytesIO(b"12"))
1480 b.read(1)
1481 b.close()
1482 self.assertRaises(ValueError, b.peek)
1483 self.assertRaises(ValueError, b.read1, 1)
1484
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001485
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001486class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001487 tp = io.BufferedReader
1488
1489 def test_constructor(self):
1490 BufferedReaderTest.test_constructor(self)
1491 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001492 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001493 if sys.maxsize > 0x7FFFFFFF:
1494 rawio = self.MockRawIO()
1495 bufio = self.tp(rawio)
1496 self.assertRaises((OverflowError, MemoryError, ValueError),
1497 bufio.__init__, rawio, sys.maxsize)
1498
1499 def test_initialization(self):
1500 rawio = self.MockRawIO([b"abc"])
1501 bufio = self.tp(rawio)
1502 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1503 self.assertRaises(ValueError, bufio.read)
1504 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1505 self.assertRaises(ValueError, bufio.read)
1506 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1507 self.assertRaises(ValueError, bufio.read)
1508
1509 def test_misbehaved_io_read(self):
1510 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1511 bufio = self.tp(rawio)
1512 # _pyio.BufferedReader seems to implement reading different, so that
1513 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001514 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001515
1516 def test_garbage_collection(self):
1517 # C BufferedReader objects are collected.
1518 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001519 with support.check_warnings(('', ResourceWarning)):
1520 rawio = self.FileIO(support.TESTFN, "w+b")
1521 f = self.tp(rawio)
1522 f.f = f
1523 wr = weakref.ref(f)
1524 del f
1525 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001526 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001527
R David Murray67bfe802013-02-23 21:51:05 -05001528 def test_args_error(self):
1529 # Issue #17275
1530 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1531 self.tp(io.BytesIO(), 1024, 1024, 1024)
1532
1533
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001534class PyBufferedReaderTest(BufferedReaderTest):
1535 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001536
Guido van Rossuma9e20242007-03-08 00:43:48 +00001537
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001538class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1539 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001540
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001541 def test_constructor(self):
1542 rawio = self.MockRawIO()
1543 bufio = self.tp(rawio)
1544 bufio.__init__(rawio)
1545 bufio.__init__(rawio, buffer_size=1024)
1546 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001547 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001548 bufio.flush()
1549 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1550 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1551 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1552 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001553 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001554 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001555 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001556
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001557 def test_uninitialized(self):
1558 bufio = self.tp.__new__(self.tp)
1559 del bufio
1560 bufio = self.tp.__new__(self.tp)
1561 self.assertRaisesRegex((ValueError, AttributeError),
1562 'uninitialized|has no attribute',
1563 bufio.write, b'')
1564 bufio.__init__(self.MockRawIO())
1565 self.assertEqual(bufio.write(b''), 0)
1566
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001567 def test_detach_flush(self):
1568 raw = self.MockRawIO()
1569 buf = self.tp(raw)
1570 buf.write(b"howdy!")
1571 self.assertFalse(raw._write_stack)
1572 buf.detach()
1573 self.assertEqual(raw._write_stack, [b"howdy!"])
1574
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001575 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001576 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001577 writer = self.MockRawIO()
1578 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001579 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001580 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001581 buffer = bytearray(b"def")
1582 bufio.write(buffer)
1583 buffer[:] = b"***" # Overwrite our copy of the data
1584 bufio.flush()
1585 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001586
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001587 def test_write_overflow(self):
1588 writer = self.MockRawIO()
1589 bufio = self.tp(writer, 8)
1590 contents = b"abcdefghijklmnop"
1591 for n in range(0, len(contents), 3):
1592 bufio.write(contents[n:n+3])
1593 flushed = b"".join(writer._write_stack)
1594 # At least (total - 8) bytes were implicitly flushed, perhaps more
1595 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001596 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001597
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001598 def check_writes(self, intermediate_func):
1599 # Lots of writes, test the flushed output is as expected.
1600 contents = bytes(range(256)) * 1000
1601 n = 0
1602 writer = self.MockRawIO()
1603 bufio = self.tp(writer, 13)
1604 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1605 def gen_sizes():
1606 for size in count(1):
1607 for i in range(15):
1608 yield size
1609 sizes = gen_sizes()
1610 while n < len(contents):
1611 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001612 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001613 intermediate_func(bufio)
1614 n += size
1615 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001616 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001617
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001618 def test_writes(self):
1619 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001620
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001621 def test_writes_and_flushes(self):
1622 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001623
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001624 def test_writes_and_seeks(self):
1625 def _seekabs(bufio):
1626 pos = bufio.tell()
1627 bufio.seek(pos + 1, 0)
1628 bufio.seek(pos - 1, 0)
1629 bufio.seek(pos, 0)
1630 self.check_writes(_seekabs)
1631 def _seekrel(bufio):
1632 pos = bufio.seek(0, 1)
1633 bufio.seek(+1, 1)
1634 bufio.seek(-1, 1)
1635 bufio.seek(pos, 0)
1636 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001637
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001638 def test_writes_and_truncates(self):
1639 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001640
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001641 def test_write_non_blocking(self):
1642 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001643 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001644
Ezio Melottib3aedd42010-11-20 19:04:17 +00001645 self.assertEqual(bufio.write(b"abcd"), 4)
1646 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001647 # 1 byte will be written, the rest will be buffered
1648 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001649 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001650
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001651 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1652 raw.block_on(b"0")
1653 try:
1654 bufio.write(b"opqrwxyz0123456789")
1655 except self.BlockingIOError as e:
1656 written = e.characters_written
1657 else:
1658 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001659 self.assertEqual(written, 16)
1660 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001661 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001662
Ezio Melottib3aedd42010-11-20 19:04:17 +00001663 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001664 s = raw.pop_written()
1665 # Previously buffered bytes were flushed
1666 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001667
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001668 def test_write_and_rewind(self):
1669 raw = io.BytesIO()
1670 bufio = self.tp(raw, 4)
1671 self.assertEqual(bufio.write(b"abcdef"), 6)
1672 self.assertEqual(bufio.tell(), 6)
1673 bufio.seek(0, 0)
1674 self.assertEqual(bufio.write(b"XY"), 2)
1675 bufio.seek(6, 0)
1676 self.assertEqual(raw.getvalue(), b"XYcdef")
1677 self.assertEqual(bufio.write(b"123456"), 6)
1678 bufio.flush()
1679 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001680
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001681 def test_flush(self):
1682 writer = self.MockRawIO()
1683 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001684 bufio.write(b"abc")
1685 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001686 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001687
Antoine Pitrou131a4892012-10-16 22:57:11 +02001688 def test_writelines(self):
1689 l = [b'ab', b'cd', b'ef']
1690 writer = self.MockRawIO()
1691 bufio = self.tp(writer, 8)
1692 bufio.writelines(l)
1693 bufio.flush()
1694 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1695
1696 def test_writelines_userlist(self):
1697 l = UserList([b'ab', b'cd', b'ef'])
1698 writer = self.MockRawIO()
1699 bufio = self.tp(writer, 8)
1700 bufio.writelines(l)
1701 bufio.flush()
1702 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1703
1704 def test_writelines_error(self):
1705 writer = self.MockRawIO()
1706 bufio = self.tp(writer, 8)
1707 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1708 self.assertRaises(TypeError, bufio.writelines, None)
1709 self.assertRaises(TypeError, bufio.writelines, 'abc')
1710
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001711 def test_destructor(self):
1712 writer = self.MockRawIO()
1713 bufio = self.tp(writer, 8)
1714 bufio.write(b"abc")
1715 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001716 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001717 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001718
1719 def test_truncate(self):
1720 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001721 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001722 bufio = self.tp(raw, 8)
1723 bufio.write(b"abcdef")
1724 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001725 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001726 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001727 self.assertEqual(f.read(), b"abc")
1728
Nitish Chandra059f58c2018-01-28 21:30:09 +05301729 def test_truncate_after_write(self):
1730 # Ensure that truncate preserves the file position after
1731 # writes longer than the buffer size.
1732 # Issue: https://bugs.python.org/issue32228
1733 with self.open(support.TESTFN, "wb") as f:
1734 # Fill with some buffer
1735 f.write(b'\x00' * 10000)
1736 buffer_sizes = [8192, 4096, 200]
1737 for buffer_size in buffer_sizes:
1738 with self.open(support.TESTFN, "r+b", buffering=buffer_size) as f:
1739 f.write(b'\x00' * (buffer_size + 1))
1740 # After write write_pos and write_end are set to 0
1741 f.read(1)
1742 # read operation makes sure that pos != raw_pos
1743 f.truncate()
1744 self.assertEqual(f.tell(), buffer_size + 2)
1745
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001746 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001747 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001748 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001749 # Write out many bytes from many threads and test they were
1750 # all flushed.
1751 N = 1000
1752 contents = bytes(range(256)) * N
1753 sizes = cycle([1, 19])
1754 n = 0
1755 queue = deque()
1756 while n < len(contents):
1757 size = next(sizes)
1758 queue.append(contents[n:n+size])
1759 n += size
1760 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001761 # We use a real file object because it allows us to
1762 # exercise situations where the GIL is released before
1763 # writing the buffer to the raw streams. This is in addition
1764 # to concurrency issues due to switching threads in the middle
1765 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001766 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001767 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001768 errors = []
1769 def f():
1770 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001771 while True:
1772 try:
1773 s = queue.popleft()
1774 except IndexError:
1775 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001776 bufio.write(s)
1777 except Exception as e:
1778 errors.append(e)
1779 raise
1780 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001781 with support.start_threads(threads):
1782 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001783 self.assertFalse(errors,
1784 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001785 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001786 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001787 s = f.read()
1788 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001789 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001790 finally:
1791 support.unlink(support.TESTFN)
1792
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001793 def test_misbehaved_io(self):
1794 rawio = self.MisbehavedRawIO()
1795 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001796 self.assertRaises(OSError, bufio.seek, 0)
1797 self.assertRaises(OSError, bufio.tell)
1798 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001799
Florent Xicluna109d5732012-07-07 17:03:22 +02001800 def test_max_buffer_size_removal(self):
1801 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001802 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001803
Benjamin Peterson68623612012-12-20 11:53:11 -06001804 def test_write_error_on_close(self):
1805 raw = self.MockRawIO()
1806 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001807 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001808 raw.write = bad_write
1809 b = self.tp(raw)
1810 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001811 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001812 self.assertTrue(b.closed)
1813
benfogle9703f092017-11-10 16:03:40 -05001814 def test_slow_close_from_thread(self):
1815 # Issue #31976
1816 rawio = self.SlowFlushRawIO()
1817 bufio = self.tp(rawio, 8)
1818 t = threading.Thread(target=bufio.close)
1819 t.start()
1820 rawio.in_flush.wait()
1821 self.assertRaises(ValueError, bufio.write, b'spam')
1822 self.assertTrue(bufio.closed)
1823 t.join()
1824
1825
Benjamin Peterson59406a92009-03-26 17:10:29 +00001826
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001827class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001828 tp = io.BufferedWriter
1829
1830 def test_constructor(self):
1831 BufferedWriterTest.test_constructor(self)
1832 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001833 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001834 if sys.maxsize > 0x7FFFFFFF:
1835 rawio = self.MockRawIO()
1836 bufio = self.tp(rawio)
1837 self.assertRaises((OverflowError, MemoryError, ValueError),
1838 bufio.__init__, rawio, sys.maxsize)
1839
1840 def test_initialization(self):
1841 rawio = self.MockRawIO()
1842 bufio = self.tp(rawio)
1843 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1844 self.assertRaises(ValueError, bufio.write, b"def")
1845 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1846 self.assertRaises(ValueError, bufio.write, b"def")
1847 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1848 self.assertRaises(ValueError, bufio.write, b"def")
1849
1850 def test_garbage_collection(self):
1851 # C BufferedWriter objects are collected, and collecting them flushes
1852 # all data to disk.
1853 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001854 with support.check_warnings(('', ResourceWarning)):
1855 rawio = self.FileIO(support.TESTFN, "w+b")
1856 f = self.tp(rawio)
1857 f.write(b"123xxx")
1858 f.x = f
1859 wr = weakref.ref(f)
1860 del f
1861 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001862 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001863 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001864 self.assertEqual(f.read(), b"123xxx")
1865
R David Murray67bfe802013-02-23 21:51:05 -05001866 def test_args_error(self):
1867 # Issue #17275
1868 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1869 self.tp(io.BytesIO(), 1024, 1024, 1024)
1870
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001871
1872class PyBufferedWriterTest(BufferedWriterTest):
1873 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001874
Guido van Rossum01a27522007-03-07 01:00:12 +00001875class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001876
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001877 def test_constructor(self):
1878 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001879 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001880
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001881 def test_uninitialized(self):
1882 pair = self.tp.__new__(self.tp)
1883 del pair
1884 pair = self.tp.__new__(self.tp)
1885 self.assertRaisesRegex((ValueError, AttributeError),
1886 'uninitialized|has no attribute',
1887 pair.read, 0)
1888 self.assertRaisesRegex((ValueError, AttributeError),
1889 'uninitialized|has no attribute',
1890 pair.write, b'')
1891 pair.__init__(self.MockRawIO(), self.MockRawIO())
1892 self.assertEqual(pair.read(0), b'')
1893 self.assertEqual(pair.write(b''), 0)
1894
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001895 def test_detach(self):
1896 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1897 self.assertRaises(self.UnsupportedOperation, pair.detach)
1898
Florent Xicluna109d5732012-07-07 17:03:22 +02001899 def test_constructor_max_buffer_size_removal(self):
1900 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001901 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001902
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001903 def test_constructor_with_not_readable(self):
1904 class NotReadable(MockRawIO):
1905 def readable(self):
1906 return False
1907
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001908 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001909
1910 def test_constructor_with_not_writeable(self):
1911 class NotWriteable(MockRawIO):
1912 def writable(self):
1913 return False
1914
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001915 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001916
1917 def test_read(self):
1918 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1919
1920 self.assertEqual(pair.read(3), b"abc")
1921 self.assertEqual(pair.read(1), b"d")
1922 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001923 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1924 self.assertEqual(pair.read(None), b"abc")
1925
1926 def test_readlines(self):
1927 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1928 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1929 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1930 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001931
1932 def test_read1(self):
1933 # .read1() is delegated to the underlying reader object, so this test
1934 # can be shallow.
1935 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1936
1937 self.assertEqual(pair.read1(3), b"abc")
Martin Panterccb2c0e2016-10-20 23:48:14 +00001938 self.assertEqual(pair.read1(), b"def")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001939
1940 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00001941 for method in ("readinto", "readinto1"):
1942 with self.subTest(method):
1943 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001944
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03001945 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00001946 self.assertEqual(getattr(pair, method)(data), 5)
1947 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001948
1949 def test_write(self):
1950 w = self.MockRawIO()
1951 pair = self.tp(self.MockRawIO(), w)
1952
1953 pair.write(b"abc")
1954 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00001955 buffer = bytearray(b"def")
1956 pair.write(buffer)
1957 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001958 pair.flush()
1959 self.assertEqual(w._write_stack, [b"abc", b"def"])
1960
1961 def test_peek(self):
1962 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1963
1964 self.assertTrue(pair.peek(3).startswith(b"abc"))
1965 self.assertEqual(pair.read(3), b"abc")
1966
1967 def test_readable(self):
1968 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1969 self.assertTrue(pair.readable())
1970
1971 def test_writeable(self):
1972 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1973 self.assertTrue(pair.writable())
1974
1975 def test_seekable(self):
1976 # BufferedRWPairs are never seekable, even if their readers and writers
1977 # are.
1978 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1979 self.assertFalse(pair.seekable())
1980
1981 # .flush() is delegated to the underlying writer object and has been
1982 # tested in the test_write method.
1983
1984 def test_close_and_closed(self):
1985 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1986 self.assertFalse(pair.closed)
1987 pair.close()
1988 self.assertTrue(pair.closed)
1989
Serhiy Storchaka7665be62015-03-24 23:21:57 +02001990 def test_reader_close_error_on_close(self):
1991 def reader_close():
1992 reader_non_existing
1993 reader = self.MockRawIO()
1994 reader.close = reader_close
1995 writer = self.MockRawIO()
1996 pair = self.tp(reader, writer)
1997 with self.assertRaises(NameError) as err:
1998 pair.close()
1999 self.assertIn('reader_non_existing', str(err.exception))
2000 self.assertTrue(pair.closed)
2001 self.assertFalse(reader.closed)
2002 self.assertTrue(writer.closed)
2003
2004 def test_writer_close_error_on_close(self):
2005 def writer_close():
2006 writer_non_existing
2007 reader = self.MockRawIO()
2008 writer = self.MockRawIO()
2009 writer.close = writer_close
2010 pair = self.tp(reader, writer)
2011 with self.assertRaises(NameError) as err:
2012 pair.close()
2013 self.assertIn('writer_non_existing', str(err.exception))
2014 self.assertFalse(pair.closed)
2015 self.assertTrue(reader.closed)
2016 self.assertFalse(writer.closed)
2017
2018 def test_reader_writer_close_error_on_close(self):
2019 def reader_close():
2020 reader_non_existing
2021 def writer_close():
2022 writer_non_existing
2023 reader = self.MockRawIO()
2024 reader.close = reader_close
2025 writer = self.MockRawIO()
2026 writer.close = writer_close
2027 pair = self.tp(reader, writer)
2028 with self.assertRaises(NameError) as err:
2029 pair.close()
2030 self.assertIn('reader_non_existing', str(err.exception))
2031 self.assertIsInstance(err.exception.__context__, NameError)
2032 self.assertIn('writer_non_existing', str(err.exception.__context__))
2033 self.assertFalse(pair.closed)
2034 self.assertFalse(reader.closed)
2035 self.assertFalse(writer.closed)
2036
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002037 def test_isatty(self):
2038 class SelectableIsAtty(MockRawIO):
2039 def __init__(self, isatty):
2040 MockRawIO.__init__(self)
2041 self._isatty = isatty
2042
2043 def isatty(self):
2044 return self._isatty
2045
2046 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2047 self.assertFalse(pair.isatty())
2048
2049 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2050 self.assertTrue(pair.isatty())
2051
2052 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2053 self.assertTrue(pair.isatty())
2054
2055 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2056 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00002057
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04002058 def test_weakref_clearing(self):
2059 brw = self.tp(self.MockRawIO(), self.MockRawIO())
2060 ref = weakref.ref(brw)
2061 brw = None
2062 ref = None # Shouldn't segfault.
2063
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002064class CBufferedRWPairTest(BufferedRWPairTest):
2065 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002066
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002067class PyBufferedRWPairTest(BufferedRWPairTest):
2068 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002069
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002070
2071class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2072 read_mode = "rb+"
2073 write_mode = "wb+"
2074
2075 def test_constructor(self):
2076 BufferedReaderTest.test_constructor(self)
2077 BufferedWriterTest.test_constructor(self)
2078
Serhiy Storchaka61e24932014-02-12 10:52:35 +02002079 def test_uninitialized(self):
2080 BufferedReaderTest.test_uninitialized(self)
2081 BufferedWriterTest.test_uninitialized(self)
2082
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002083 def test_read_and_write(self):
2084 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00002085 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00002086
2087 self.assertEqual(b"as", rw.read(2))
2088 rw.write(b"ddd")
2089 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002090 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002091 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002092 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00002093
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002094 def test_seek_and_tell(self):
2095 raw = self.BytesIO(b"asdfghjkl")
2096 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00002097
Ezio Melottib3aedd42010-11-20 19:04:17 +00002098 self.assertEqual(b"as", rw.read(2))
2099 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002100 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002101 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00002102
Antoine Pitroue05565e2011-08-20 14:39:23 +02002103 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00002104 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02002105 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002106 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002107 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002108 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002109 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002110 self.assertEqual(7, rw.tell())
2111 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02002112 rw.flush()
2113 self.assertEqual(b"asdf123fl", raw.getvalue())
2114
Christian Heimes8e42a0a2007-11-08 18:04:45 +00002115 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002116
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002117 def check_flush_and_read(self, read_func):
2118 raw = self.BytesIO(b"abcdefghi")
2119 bufio = self.tp(raw)
2120
Ezio Melottib3aedd42010-11-20 19:04:17 +00002121 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002122 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002123 self.assertEqual(b"ef", read_func(bufio, 2))
2124 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002125 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002126 self.assertEqual(6, bufio.tell())
2127 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002128 raw.seek(0, 0)
2129 raw.write(b"XYZ")
2130 # flush() resets the read buffer
2131 bufio.flush()
2132 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002133 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002134
2135 def test_flush_and_read(self):
2136 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2137
2138 def test_flush_and_readinto(self):
2139 def _readinto(bufio, n=-1):
2140 b = bytearray(n if n >= 0 else 9999)
2141 n = bufio.readinto(b)
2142 return bytes(b[:n])
2143 self.check_flush_and_read(_readinto)
2144
2145 def test_flush_and_peek(self):
2146 def _peek(bufio, n=-1):
2147 # This relies on the fact that the buffer can contain the whole
2148 # raw stream, otherwise peek() can return less.
2149 b = bufio.peek(n)
2150 if n != -1:
2151 b = b[:n]
2152 bufio.seek(len(b), 1)
2153 return b
2154 self.check_flush_and_read(_peek)
2155
2156 def test_flush_and_write(self):
2157 raw = self.BytesIO(b"abcdefghi")
2158 bufio = self.tp(raw)
2159
2160 bufio.write(b"123")
2161 bufio.flush()
2162 bufio.write(b"45")
2163 bufio.flush()
2164 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002165 self.assertEqual(b"12345fghi", raw.getvalue())
2166 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002167
2168 def test_threads(self):
2169 BufferedReaderTest.test_threads(self)
2170 BufferedWriterTest.test_threads(self)
2171
2172 def test_writes_and_peek(self):
2173 def _peek(bufio):
2174 bufio.peek(1)
2175 self.check_writes(_peek)
2176 def _peek(bufio):
2177 pos = bufio.tell()
2178 bufio.seek(-1, 1)
2179 bufio.peek(1)
2180 bufio.seek(pos, 0)
2181 self.check_writes(_peek)
2182
2183 def test_writes_and_reads(self):
2184 def _read(bufio):
2185 bufio.seek(-1, 1)
2186 bufio.read(1)
2187 self.check_writes(_read)
2188
2189 def test_writes_and_read1s(self):
2190 def _read1(bufio):
2191 bufio.seek(-1, 1)
2192 bufio.read1(1)
2193 self.check_writes(_read1)
2194
2195 def test_writes_and_readintos(self):
2196 def _read(bufio):
2197 bufio.seek(-1, 1)
2198 bufio.readinto(bytearray(1))
2199 self.check_writes(_read)
2200
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002201 def test_write_after_readahead(self):
2202 # Issue #6629: writing after the buffer was filled by readahead should
2203 # first rewind the raw stream.
2204 for overwrite_size in [1, 5]:
2205 raw = self.BytesIO(b"A" * 10)
2206 bufio = self.tp(raw, 4)
2207 # Trigger readahead
2208 self.assertEqual(bufio.read(1), b"A")
2209 self.assertEqual(bufio.tell(), 1)
2210 # Overwriting should rewind the raw stream if it needs so
2211 bufio.write(b"B" * overwrite_size)
2212 self.assertEqual(bufio.tell(), overwrite_size + 1)
2213 # If the write size was smaller than the buffer size, flush() and
2214 # check that rewind happens.
2215 bufio.flush()
2216 self.assertEqual(bufio.tell(), overwrite_size + 1)
2217 s = raw.getvalue()
2218 self.assertEqual(s,
2219 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2220
Antoine Pitrou7c404892011-05-13 00:13:33 +02002221 def test_write_rewind_write(self):
2222 # Various combinations of reading / writing / seeking backwards / writing again
2223 def mutate(bufio, pos1, pos2):
2224 assert pos2 >= pos1
2225 # Fill the buffer
2226 bufio.seek(pos1)
2227 bufio.read(pos2 - pos1)
2228 bufio.write(b'\x02')
2229 # This writes earlier than the previous write, but still inside
2230 # the buffer.
2231 bufio.seek(pos1)
2232 bufio.write(b'\x01')
2233
2234 b = b"\x80\x81\x82\x83\x84"
2235 for i in range(0, len(b)):
2236 for j in range(i, len(b)):
2237 raw = self.BytesIO(b)
2238 bufio = self.tp(raw, 100)
2239 mutate(bufio, i, j)
2240 bufio.flush()
2241 expected = bytearray(b)
2242 expected[j] = 2
2243 expected[i] = 1
2244 self.assertEqual(raw.getvalue(), expected,
2245 "failed result for i=%d, j=%d" % (i, j))
2246
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002247 def test_truncate_after_read_or_write(self):
2248 raw = self.BytesIO(b"A" * 10)
2249 bufio = self.tp(raw, 100)
2250 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2251 self.assertEqual(bufio.truncate(), 2)
2252 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2253 self.assertEqual(bufio.truncate(), 4)
2254
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002255 def test_misbehaved_io(self):
2256 BufferedReaderTest.test_misbehaved_io(self)
2257 BufferedWriterTest.test_misbehaved_io(self)
2258
Antoine Pitroue05565e2011-08-20 14:39:23 +02002259 def test_interleaved_read_write(self):
2260 # Test for issue #12213
2261 with self.BytesIO(b'abcdefgh') as raw:
2262 with self.tp(raw, 100) as f:
2263 f.write(b"1")
2264 self.assertEqual(f.read(1), b'b')
2265 f.write(b'2')
2266 self.assertEqual(f.read1(1), b'd')
2267 f.write(b'3')
2268 buf = bytearray(1)
2269 f.readinto(buf)
2270 self.assertEqual(buf, b'f')
2271 f.write(b'4')
2272 self.assertEqual(f.peek(1), b'h')
2273 f.flush()
2274 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2275
2276 with self.BytesIO(b'abc') as raw:
2277 with self.tp(raw, 100) as f:
2278 self.assertEqual(f.read(1), b'a')
2279 f.write(b"2")
2280 self.assertEqual(f.read(1), b'c')
2281 f.flush()
2282 self.assertEqual(raw.getvalue(), b'a2c')
2283
2284 def test_interleaved_readline_write(self):
2285 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2286 with self.tp(raw) as f:
2287 f.write(b'1')
2288 self.assertEqual(f.readline(), b'b\n')
2289 f.write(b'2')
2290 self.assertEqual(f.readline(), b'def\n')
2291 f.write(b'3')
2292 self.assertEqual(f.readline(), b'\n')
2293 f.flush()
2294 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2295
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002296 # You can't construct a BufferedRandom over a non-seekable stream.
2297 test_unseekable = None
2298
R David Murray67bfe802013-02-23 21:51:05 -05002299
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002300class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002301 tp = io.BufferedRandom
2302
2303 def test_constructor(self):
2304 BufferedRandomTest.test_constructor(self)
2305 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08002306 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002307 if sys.maxsize > 0x7FFFFFFF:
2308 rawio = self.MockRawIO()
2309 bufio = self.tp(rawio)
2310 self.assertRaises((OverflowError, MemoryError, ValueError),
2311 bufio.__init__, rawio, sys.maxsize)
2312
2313 def test_garbage_collection(self):
2314 CBufferedReaderTest.test_garbage_collection(self)
2315 CBufferedWriterTest.test_garbage_collection(self)
2316
R David Murray67bfe802013-02-23 21:51:05 -05002317 def test_args_error(self):
2318 # Issue #17275
2319 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2320 self.tp(io.BytesIO(), 1024, 1024, 1024)
2321
2322
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002323class PyBufferedRandomTest(BufferedRandomTest):
2324 tp = pyio.BufferedRandom
2325
2326
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002327# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2328# properties:
2329# - A single output character can correspond to many bytes of input.
2330# - The number of input bytes to complete the character can be
2331# undetermined until the last input byte is received.
2332# - The number of input bytes can vary depending on previous input.
2333# - A single input byte can correspond to many characters of output.
2334# - The number of output characters can be undetermined until the
2335# last input byte is received.
2336# - The number of output characters can vary depending on previous input.
2337
2338class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2339 """
2340 For testing seek/tell behavior with a stateful, buffering decoder.
2341
2342 Input is a sequence of words. Words may be fixed-length (length set
2343 by input) or variable-length (period-terminated). In variable-length
2344 mode, extra periods are ignored. Possible words are:
2345 - 'i' followed by a number sets the input length, I (maximum 99).
2346 When I is set to 0, words are space-terminated.
2347 - 'o' followed by a number sets the output length, O (maximum 99).
2348 - Any other word is converted into a word followed by a period on
2349 the output. The output word consists of the input word truncated
2350 or padded out with hyphens to make its length equal to O. If O
2351 is 0, the word is output verbatim without truncating or padding.
2352 I and O are initially set to 1. When I changes, any buffered input is
2353 re-scanned according to the new I. EOF also terminates the last word.
2354 """
2355
2356 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002357 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002358 self.reset()
2359
2360 def __repr__(self):
2361 return '<SID %x>' % id(self)
2362
2363 def reset(self):
2364 self.i = 1
2365 self.o = 1
2366 self.buffer = bytearray()
2367
2368 def getstate(self):
2369 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2370 return bytes(self.buffer), i*100 + o
2371
2372 def setstate(self, state):
2373 buffer, io = state
2374 self.buffer = bytearray(buffer)
2375 i, o = divmod(io, 100)
2376 self.i, self.o = i ^ 1, o ^ 1
2377
2378 def decode(self, input, final=False):
2379 output = ''
2380 for b in input:
2381 if self.i == 0: # variable-length, terminated with period
2382 if b == ord('.'):
2383 if self.buffer:
2384 output += self.process_word()
2385 else:
2386 self.buffer.append(b)
2387 else: # fixed-length, terminate after self.i bytes
2388 self.buffer.append(b)
2389 if len(self.buffer) == self.i:
2390 output += self.process_word()
2391 if final and self.buffer: # EOF terminates the last word
2392 output += self.process_word()
2393 return output
2394
2395 def process_word(self):
2396 output = ''
2397 if self.buffer[0] == ord('i'):
2398 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2399 elif self.buffer[0] == ord('o'):
2400 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2401 else:
2402 output = self.buffer.decode('ascii')
2403 if len(output) < self.o:
2404 output += '-'*self.o # pad out with hyphens
2405 if self.o:
2406 output = output[:self.o] # truncate to output length
2407 output += '.'
2408 self.buffer = bytearray()
2409 return output
2410
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002411 codecEnabled = False
2412
2413 @classmethod
2414 def lookupTestDecoder(cls, name):
2415 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002416 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002417 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002418 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002419 incrementalencoder=None,
2420 streamreader=None, streamwriter=None,
2421 incrementaldecoder=cls)
2422
2423# Register the previous decoder for testing.
2424# Disabled by default, tests will enable it.
2425codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2426
2427
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002428class StatefulIncrementalDecoderTest(unittest.TestCase):
2429 """
2430 Make sure the StatefulIncrementalDecoder actually works.
2431 """
2432
2433 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002434 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002435 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002436 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002437 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002438 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002439 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002440 # I=0, O=6 (variable-length input, fixed-length output)
2441 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2442 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002443 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002444 # I=6, O=3 (fixed-length input > fixed-length output)
2445 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2446 # I=0, then 3; O=29, then 15 (with longer output)
2447 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2448 'a----------------------------.' +
2449 'b----------------------------.' +
2450 'cde--------------------------.' +
2451 'abcdefghijabcde.' +
2452 'a.b------------.' +
2453 '.c.------------.' +
2454 'd.e------------.' +
2455 'k--------------.' +
2456 'l--------------.' +
2457 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002458 ]
2459
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002460 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002461 # Try a few one-shot test cases.
2462 for input, eof, output in self.test_cases:
2463 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002464 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002465
2466 # Also test an unfinished decode, followed by forcing EOF.
2467 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002468 self.assertEqual(d.decode(b'oiabcd'), '')
2469 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002470
2471class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002472
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002473 def setUp(self):
2474 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2475 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002476 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002477
Guido van Rossumd0712812007-04-11 16:32:43 +00002478 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002479 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002480
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002481 def test_constructor(self):
2482 r = self.BytesIO(b"\xc3\xa9\n\n")
2483 b = self.BufferedReader(r, 1000)
2484 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002485 t.__init__(b, encoding="latin-1", newline="\r\n")
2486 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002487 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002488 t.__init__(b, encoding="utf-8", line_buffering=True)
2489 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002490 self.assertEqual(t.line_buffering, True)
2491 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002492 self.assertRaises(TypeError, t.__init__, b, newline=42)
2493 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2494
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002495 def test_uninitialized(self):
2496 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2497 del t
2498 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2499 self.assertRaises(Exception, repr, t)
2500 self.assertRaisesRegex((ValueError, AttributeError),
2501 'uninitialized|has no attribute',
2502 t.read, 0)
2503 t.__init__(self.MockRawIO())
2504 self.assertEqual(t.read(0), '')
2505
Nick Coghlana9b15242014-02-04 22:11:18 +10002506 def test_non_text_encoding_codecs_are_rejected(self):
2507 # Ensure the constructor complains if passed a codec that isn't
2508 # marked as a text encoding
2509 # http://bugs.python.org/issue20404
2510 r = self.BytesIO()
2511 b = self.BufferedWriter(r)
2512 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2513 self.TextIOWrapper(b, encoding="hex")
2514
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002515 def test_detach(self):
2516 r = self.BytesIO()
2517 b = self.BufferedWriter(r)
2518 t = self.TextIOWrapper(b)
2519 self.assertIs(t.detach(), b)
2520
2521 t = self.TextIOWrapper(b, encoding="ascii")
2522 t.write("howdy")
2523 self.assertFalse(r.getvalue())
2524 t.detach()
2525 self.assertEqual(r.getvalue(), b"howdy")
2526 self.assertRaises(ValueError, t.detach)
2527
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002528 # Operations independent of the detached stream should still work
2529 repr(t)
2530 self.assertEqual(t.encoding, "ascii")
2531 self.assertEqual(t.errors, "strict")
2532 self.assertFalse(t.line_buffering)
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002533 self.assertFalse(t.write_through)
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002534
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002535 def test_repr(self):
2536 raw = self.BytesIO("hello".encode("utf-8"))
2537 b = self.BufferedReader(raw)
2538 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002539 modname = self.TextIOWrapper.__module__
2540 self.assertEqual(repr(t),
2541 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2542 raw.name = "dummy"
2543 self.assertEqual(repr(t),
2544 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002545 t.mode = "r"
2546 self.assertEqual(repr(t),
2547 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002548 raw.name = b"dummy"
2549 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002550 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002551
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002552 t.buffer.detach()
2553 repr(t) # Should not raise an exception
2554
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002555 def test_recursive_repr(self):
2556 # Issue #25455
2557 raw = self.BytesIO()
2558 t = self.TextIOWrapper(raw)
2559 with support.swap_attr(raw, 'name', t):
2560 try:
2561 repr(t) # Should not crash
2562 except RuntimeError:
2563 pass
2564
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002565 def test_line_buffering(self):
2566 r = self.BytesIO()
2567 b = self.BufferedWriter(r, 1000)
2568 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002569 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002570 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002571 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002572 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002573 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002574 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002575
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002576 def test_reconfigure_line_buffering(self):
2577 r = self.BytesIO()
2578 b = self.BufferedWriter(r, 1000)
2579 t = self.TextIOWrapper(b, newline="\n", line_buffering=False)
2580 t.write("AB\nC")
2581 self.assertEqual(r.getvalue(), b"")
2582
2583 t.reconfigure(line_buffering=True) # implicit flush
2584 self.assertEqual(r.getvalue(), b"AB\nC")
2585 t.write("DEF\nG")
2586 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2587 t.write("H")
2588 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2589 t.reconfigure(line_buffering=False) # implicit flush
2590 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2591 t.write("IJ")
2592 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2593
2594 # Keeping default value
2595 t.reconfigure()
2596 t.reconfigure(line_buffering=None)
2597 self.assertEqual(t.line_buffering, False)
2598 t.reconfigure(line_buffering=True)
2599 t.reconfigure()
2600 t.reconfigure(line_buffering=None)
2601 self.assertEqual(t.line_buffering, True)
2602
Victor Stinner91106cd2017-12-13 12:29:09 +01002603 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002604 def test_default_encoding(self):
2605 old_environ = dict(os.environ)
2606 try:
2607 # try to get a user preferred encoding different than the current
2608 # locale encoding to check that TextIOWrapper() uses the current
2609 # locale encoding and not the user preferred encoding
2610 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2611 if key in os.environ:
2612 del os.environ[key]
2613
2614 current_locale_encoding = locale.getpreferredencoding(False)
2615 b = self.BytesIO()
2616 t = self.TextIOWrapper(b)
2617 self.assertEqual(t.encoding, current_locale_encoding)
2618 finally:
2619 os.environ.clear()
2620 os.environ.update(old_environ)
2621
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002622 @support.cpython_only
Victor Stinner91106cd2017-12-13 12:29:09 +01002623 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Serhiy Storchaka78980432013-01-15 01:12:17 +02002624 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002625 # Issue 15989
2626 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002627 b = self.BytesIO()
2628 b.fileno = lambda: _testcapi.INT_MAX + 1
2629 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2630 b.fileno = lambda: _testcapi.UINT_MAX + 1
2631 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2632
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002633 def test_encoding(self):
2634 # Check the encoding attribute is always set, and valid
2635 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002636 t = self.TextIOWrapper(b, encoding="utf-8")
2637 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002638 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002639 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002640 codecs.lookup(t.encoding)
2641
2642 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002643 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002644 b = self.BytesIO(b"abc\n\xff\n")
2645 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002646 self.assertRaises(UnicodeError, t.read)
2647 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002648 b = self.BytesIO(b"abc\n\xff\n")
2649 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002650 self.assertRaises(UnicodeError, t.read)
2651 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002652 b = self.BytesIO(b"abc\n\xff\n")
2653 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002654 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002655 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002656 b = self.BytesIO(b"abc\n\xff\n")
2657 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002658 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002659
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002660 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002661 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002662 b = self.BytesIO()
2663 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002664 self.assertRaises(UnicodeError, t.write, "\xff")
2665 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002666 b = self.BytesIO()
2667 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002668 self.assertRaises(UnicodeError, t.write, "\xff")
2669 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002670 b = self.BytesIO()
2671 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002672 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002673 t.write("abc\xffdef\n")
2674 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002675 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002676 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002677 b = self.BytesIO()
2678 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002679 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002680 t.write("abc\xffdef\n")
2681 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002682 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002683
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002684 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002685 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2686
2687 tests = [
2688 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002689 [ '', input_lines ],
2690 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2691 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2692 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002693 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002694 encodings = (
2695 'utf-8', 'latin-1',
2696 'utf-16', 'utf-16-le', 'utf-16-be',
2697 'utf-32', 'utf-32-le', 'utf-32-be',
2698 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002699
Guido van Rossum8358db22007-08-18 21:39:55 +00002700 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002701 # character in TextIOWrapper._pending_line.
2702 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002703 # XXX: str.encode() should return bytes
2704 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002705 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002706 for bufsize in range(1, 10):
2707 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002708 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2709 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002710 encoding=encoding)
2711 if do_reads:
2712 got_lines = []
2713 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002714 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002715 if c2 == '':
2716 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002717 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002718 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002719 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002720 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002721
2722 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002723 self.assertEqual(got_line, exp_line)
2724 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002725
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002726 def test_newlines_input(self):
2727 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002728 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2729 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002730 (None, normalized.decode("ascii").splitlines(keepends=True)),
2731 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002732 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2733 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2734 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002735 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002736 buf = self.BytesIO(testdata)
2737 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002738 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002739 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002740 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002741
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002742 def test_newlines_output(self):
2743 testdict = {
2744 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2745 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2746 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2747 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2748 }
2749 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2750 for newline, expected in tests:
2751 buf = self.BytesIO()
2752 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2753 txt.write("AAA\nB")
2754 txt.write("BB\nCCC\n")
2755 txt.write("X\rY\r\nZ")
2756 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002757 self.assertEqual(buf.closed, False)
2758 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002759
2760 def test_destructor(self):
2761 l = []
2762 base = self.BytesIO
2763 class MyBytesIO(base):
2764 def close(self):
2765 l.append(self.getvalue())
2766 base.close(self)
2767 b = MyBytesIO()
2768 t = self.TextIOWrapper(b, encoding="ascii")
2769 t.write("abc")
2770 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002771 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002772 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002773
2774 def test_override_destructor(self):
2775 record = []
2776 class MyTextIO(self.TextIOWrapper):
2777 def __del__(self):
2778 record.append(1)
2779 try:
2780 f = super().__del__
2781 except AttributeError:
2782 pass
2783 else:
2784 f()
2785 def close(self):
2786 record.append(2)
2787 super().close()
2788 def flush(self):
2789 record.append(3)
2790 super().flush()
2791 b = self.BytesIO()
2792 t = MyTextIO(b, encoding="ascii")
2793 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002794 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002795 self.assertEqual(record, [1, 2, 3])
2796
2797 def test_error_through_destructor(self):
2798 # Test that the exception state is not modified by a destructor,
2799 # even if close() fails.
2800 rawio = self.CloseFailureIO()
2801 def f():
2802 self.TextIOWrapper(rawio).xyzzy
2803 with support.captured_output("stderr") as s:
2804 self.assertRaises(AttributeError, f)
2805 s = s.getvalue().strip()
2806 if s:
2807 # The destructor *may* have printed an unraisable error, check it
2808 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002809 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002810 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002811
Guido van Rossum9b76da62007-04-11 01:09:03 +00002812 # Systematic tests of the text I/O API
2813
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002814 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002815 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002816 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002817 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002818 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002819 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002820 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002821 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002822 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002823 self.assertEqual(f.tell(), 0)
2824 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002825 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002826 self.assertEqual(f.seek(0), 0)
2827 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002828 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002829 self.assertEqual(f.read(2), "ab")
2830 self.assertEqual(f.read(1), "c")
2831 self.assertEqual(f.read(1), "")
2832 self.assertEqual(f.read(), "")
2833 self.assertEqual(f.tell(), cookie)
2834 self.assertEqual(f.seek(0), 0)
2835 self.assertEqual(f.seek(0, 2), cookie)
2836 self.assertEqual(f.write("def"), 3)
2837 self.assertEqual(f.seek(cookie), cookie)
2838 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002839 if enc.startswith("utf"):
2840 self.multi_line_test(f, enc)
2841 f.close()
2842
2843 def multi_line_test(self, f, enc):
2844 f.seek(0)
2845 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002846 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002847 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002848 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002849 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002850 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002851 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002852 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002853 wlines.append((f.tell(), line))
2854 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002855 f.seek(0)
2856 rlines = []
2857 while True:
2858 pos = f.tell()
2859 line = f.readline()
2860 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002861 break
2862 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002863 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002864
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002865 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002866 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002867 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002868 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002869 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002870 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002871 p2 = f.tell()
2872 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002873 self.assertEqual(f.tell(), p0)
2874 self.assertEqual(f.readline(), "\xff\n")
2875 self.assertEqual(f.tell(), p1)
2876 self.assertEqual(f.readline(), "\xff\n")
2877 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002878 f.seek(0)
2879 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002880 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002881 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002882 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002883 f.close()
2884
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002885 def test_seeking(self):
2886 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002887 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002888 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002889 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002890 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002891 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002892 suffix = bytes(u_suffix.encode("utf-8"))
2893 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002894 with self.open(support.TESTFN, "wb") as f:
2895 f.write(line*2)
2896 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2897 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002898 self.assertEqual(s, str(prefix, "ascii"))
2899 self.assertEqual(f.tell(), prefix_size)
2900 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002901
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002902 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002903 # Regression test for a specific bug
2904 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002905 with self.open(support.TESTFN, "wb") as f:
2906 f.write(data)
2907 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2908 f._CHUNK_SIZE # Just test that it exists
2909 f._CHUNK_SIZE = 2
2910 f.readline()
2911 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002912
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002913 def test_seek_and_tell(self):
2914 #Test seek/tell using the StatefulIncrementalDecoder.
2915 # Make test faster by doing smaller seeks
2916 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002917
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002918 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002919 """Tell/seek to various points within a data stream and ensure
2920 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002921 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002922 f.write(data)
2923 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002924 f = self.open(support.TESTFN, encoding='test_decoder')
2925 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002926 decoded = f.read()
2927 f.close()
2928
Neal Norwitze2b07052008-03-18 19:52:05 +00002929 for i in range(min_pos, len(decoded) + 1): # seek positions
2930 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002931 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002932 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002933 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002934 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002935 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002936 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002937 f.close()
2938
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002939 # Enable the test decoder.
2940 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002941
2942 # Run the tests.
2943 try:
2944 # Try each test case.
2945 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002946 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002947
2948 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002949 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2950 offset = CHUNK_SIZE - len(input)//2
2951 prefix = b'.'*offset
2952 # Don't bother seeking into the prefix (takes too long).
2953 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002954 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002955
2956 # Ensure our test decoder won't interfere with subsequent tests.
2957 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002958 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002959
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002960 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002961 data = "1234567890"
2962 tests = ("utf-16",
2963 "utf-16-le",
2964 "utf-16-be",
2965 "utf-32",
2966 "utf-32-le",
2967 "utf-32-be")
2968 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002969 buf = self.BytesIO()
2970 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002971 # Check if the BOM is written only once (see issue1753).
2972 f.write(data)
2973 f.write(data)
2974 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002975 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002976 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002977 self.assertEqual(f.read(), data * 2)
2978 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002979
Benjamin Petersona1b49012009-03-31 23:11:32 +00002980 def test_unreadable(self):
2981 class UnReadable(self.BytesIO):
2982 def readable(self):
2983 return False
2984 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002985 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002986
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002987 def test_read_one_by_one(self):
2988 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002989 reads = ""
2990 while True:
2991 c = txt.read(1)
2992 if not c:
2993 break
2994 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002995 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002996
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002997 def test_readlines(self):
2998 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2999 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
3000 txt.seek(0)
3001 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
3002 txt.seek(0)
3003 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
3004
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003005 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003006 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003007 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003008 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003009 reads = ""
3010 while True:
3011 c = txt.read(128)
3012 if not c:
3013 break
3014 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003015 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003016
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02003017 def test_writelines(self):
3018 l = ['ab', 'cd', 'ef']
3019 buf = self.BytesIO()
3020 txt = self.TextIOWrapper(buf)
3021 txt.writelines(l)
3022 txt.flush()
3023 self.assertEqual(buf.getvalue(), b'abcdef')
3024
3025 def test_writelines_userlist(self):
3026 l = UserList(['ab', 'cd', 'ef'])
3027 buf = self.BytesIO()
3028 txt = self.TextIOWrapper(buf)
3029 txt.writelines(l)
3030 txt.flush()
3031 self.assertEqual(buf.getvalue(), b'abcdef')
3032
3033 def test_writelines_error(self):
3034 txt = self.TextIOWrapper(self.BytesIO())
3035 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3036 self.assertRaises(TypeError, txt.writelines, None)
3037 self.assertRaises(TypeError, txt.writelines, b'abc')
3038
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003039 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003040 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003041
3042 # read one char at a time
3043 reads = ""
3044 while True:
3045 c = txt.read(1)
3046 if not c:
3047 break
3048 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003049 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003050
3051 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003052 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003053 txt._CHUNK_SIZE = 4
3054
3055 reads = ""
3056 while True:
3057 c = txt.read(4)
3058 if not c:
3059 break
3060 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003061 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003062
3063 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003064 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003065 txt._CHUNK_SIZE = 4
3066
3067 reads = txt.read(4)
3068 reads += txt.read(4)
3069 reads += txt.readline()
3070 reads += txt.readline()
3071 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003072 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003073
3074 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003075 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003076 txt._CHUNK_SIZE = 4
3077
3078 reads = txt.read(4)
3079 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003080 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003081
3082 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003083 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003084 txt._CHUNK_SIZE = 4
3085
3086 reads = txt.read(4)
3087 pos = txt.tell()
3088 txt.seek(0)
3089 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003090 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003091
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003092 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003093 buffer = self.BytesIO(self.testdata)
3094 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003095
3096 self.assertEqual(buffer.seekable(), txt.seekable())
3097
Antoine Pitroue4501852009-05-14 18:55:55 +00003098 def test_append_bom(self):
3099 # The BOM is not written again when appending to a non-empty file
3100 filename = support.TESTFN
3101 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3102 with self.open(filename, 'w', encoding=charset) as f:
3103 f.write('aaa')
3104 pos = f.tell()
3105 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003106 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003107
3108 with self.open(filename, 'a', encoding=charset) as f:
3109 f.write('xxx')
3110 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003111 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003112
3113 def test_seek_bom(self):
3114 # Same test, but when seeking manually
3115 filename = support.TESTFN
3116 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3117 with self.open(filename, 'w', encoding=charset) as f:
3118 f.write('aaa')
3119 pos = f.tell()
3120 with self.open(filename, 'r+', encoding=charset) as f:
3121 f.seek(pos)
3122 f.write('zzz')
3123 f.seek(0)
3124 f.write('bbb')
3125 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003126 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003127
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003128 def test_seek_append_bom(self):
3129 # Same test, but first seek to the start and then to the end
3130 filename = support.TESTFN
3131 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3132 with self.open(filename, 'w', encoding=charset) as f:
3133 f.write('aaa')
3134 with self.open(filename, 'a', encoding=charset) as f:
3135 f.seek(0)
3136 f.seek(0, self.SEEK_END)
3137 f.write('xxx')
3138 with self.open(filename, 'rb') as f:
3139 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3140
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003141 def test_errors_property(self):
3142 with self.open(support.TESTFN, "w") as f:
3143 self.assertEqual(f.errors, "strict")
3144 with self.open(support.TESTFN, "w", errors="replace") as f:
3145 self.assertEqual(f.errors, "replace")
3146
Brett Cannon31f59292011-02-21 19:29:56 +00003147 @support.no_tracing
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003148 def test_threads_write(self):
3149 # Issue6750: concurrent writes could duplicate data
3150 event = threading.Event()
3151 with self.open(support.TESTFN, "w", buffering=1) as f:
3152 def run(n):
3153 text = "Thread%03d\n" % n
3154 event.wait()
3155 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003156 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003157 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003158 with support.start_threads(threads, event.set):
3159 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003160 with self.open(support.TESTFN) as f:
3161 content = f.read()
3162 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003163 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003164
Antoine Pitrou6be88762010-05-03 16:48:20 +00003165 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003166 # Test that text file is closed despite failed flush
3167 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003168 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003169 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003170 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003171 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003172 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003173 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003174 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003175 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003176 self.assertTrue(txt.buffer.closed)
3177 self.assertTrue(closed) # flush() called
3178 self.assertFalse(closed[0]) # flush() called before file closed
3179 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003180 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003181
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003182 def test_close_error_on_close(self):
3183 buffer = self.BytesIO(self.testdata)
3184 def bad_flush():
3185 raise OSError('flush')
3186 def bad_close():
3187 raise OSError('close')
3188 buffer.close = bad_close
3189 txt = self.TextIOWrapper(buffer, encoding="ascii")
3190 txt.flush = bad_flush
3191 with self.assertRaises(OSError) as err: # exception not swallowed
3192 txt.close()
3193 self.assertEqual(err.exception.args, ('close',))
3194 self.assertIsInstance(err.exception.__context__, OSError)
3195 self.assertEqual(err.exception.__context__.args, ('flush',))
3196 self.assertFalse(txt.closed)
3197
3198 def test_nonnormalized_close_error_on_close(self):
3199 # Issue #21677
3200 buffer = self.BytesIO(self.testdata)
3201 def bad_flush():
3202 raise non_existing_flush
3203 def bad_close():
3204 raise non_existing_close
3205 buffer.close = bad_close
3206 txt = self.TextIOWrapper(buffer, encoding="ascii")
3207 txt.flush = bad_flush
3208 with self.assertRaises(NameError) as err: # exception not swallowed
3209 txt.close()
3210 self.assertIn('non_existing_close', str(err.exception))
3211 self.assertIsInstance(err.exception.__context__, NameError)
3212 self.assertIn('non_existing_flush', str(err.exception.__context__))
3213 self.assertFalse(txt.closed)
3214
Antoine Pitrou6be88762010-05-03 16:48:20 +00003215 def test_multi_close(self):
3216 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3217 txt.close()
3218 txt.close()
3219 txt.close()
3220 self.assertRaises(ValueError, txt.flush)
3221
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003222 def test_unseekable(self):
3223 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3224 self.assertRaises(self.UnsupportedOperation, txt.tell)
3225 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3226
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003227 def test_readonly_attributes(self):
3228 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3229 buf = self.BytesIO(self.testdata)
3230 with self.assertRaises(AttributeError):
3231 txt.buffer = buf
3232
Antoine Pitroue96ec682011-07-23 21:46:35 +02003233 def test_rawio(self):
3234 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3235 # that subprocess.Popen() can have the required unbuffered
3236 # semantics with universal_newlines=True.
3237 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3238 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3239 # Reads
3240 self.assertEqual(txt.read(4), 'abcd')
3241 self.assertEqual(txt.readline(), 'efghi\n')
3242 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3243
3244 def test_rawio_write_through(self):
3245 # Issue #12591: with write_through=True, writes don't need a flush
3246 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3247 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3248 write_through=True)
3249 txt.write('1')
3250 txt.write('23\n4')
3251 txt.write('5')
3252 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3253
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003254 def test_bufio_write_through(self):
3255 # Issue #21396: write_through=True doesn't force a flush()
3256 # on the underlying binary buffered object.
3257 flush_called, write_called = [], []
3258 class BufferedWriter(self.BufferedWriter):
3259 def flush(self, *args, **kwargs):
3260 flush_called.append(True)
3261 return super().flush(*args, **kwargs)
3262 def write(self, *args, **kwargs):
3263 write_called.append(True)
3264 return super().write(*args, **kwargs)
3265
3266 rawio = self.BytesIO()
3267 data = b"a"
3268 bufio = BufferedWriter(rawio, len(data)*2)
3269 textio = self.TextIOWrapper(bufio, encoding='ascii',
3270 write_through=True)
3271 # write to the buffered io but don't overflow the buffer
3272 text = data.decode('ascii')
3273 textio.write(text)
3274
3275 # buffer.flush is not called with write_through=True
3276 self.assertFalse(flush_called)
3277 # buffer.write *is* called with write_through=True
3278 self.assertTrue(write_called)
3279 self.assertEqual(rawio.getvalue(), b"") # no flush
3280
3281 write_called = [] # reset
3282 textio.write(text * 10) # total content is larger than bufio buffer
3283 self.assertTrue(write_called)
3284 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3285
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02003286 def test_reconfigure_write_through(self):
3287 raw = self.MockRawIO([])
3288 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3289 t.write('1')
3290 t.reconfigure(write_through=True) # implied flush
3291 self.assertEqual(t.write_through, True)
3292 self.assertEqual(b''.join(raw._write_stack), b'1')
3293 t.write('23')
3294 self.assertEqual(b''.join(raw._write_stack), b'123')
3295 t.reconfigure(write_through=False)
3296 self.assertEqual(t.write_through, False)
3297 t.write('45')
3298 t.flush()
3299 self.assertEqual(b''.join(raw._write_stack), b'12345')
3300 # Keeping default value
3301 t.reconfigure()
3302 t.reconfigure(write_through=None)
3303 self.assertEqual(t.write_through, False)
3304 t.reconfigure(write_through=True)
3305 t.reconfigure()
3306 t.reconfigure(write_through=None)
3307 self.assertEqual(t.write_through, True)
3308
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003309 def test_read_nonbytes(self):
3310 # Issue #17106
3311 # Crash when underlying read() returns non-bytes
3312 t = self.TextIOWrapper(self.StringIO('a'))
3313 self.assertRaises(TypeError, t.read, 1)
3314 t = self.TextIOWrapper(self.StringIO('a'))
3315 self.assertRaises(TypeError, t.readline)
3316 t = self.TextIOWrapper(self.StringIO('a'))
3317 self.assertRaises(TypeError, t.read)
3318
Oren Milmana5b4ea12017-08-25 21:14:54 +03003319 def test_illegal_encoder(self):
3320 # Issue 31271: Calling write() while the return value of encoder's
3321 # encode() is invalid shouldn't cause an assertion failure.
3322 rot13 = codecs.lookup("rot13")
3323 with support.swap_attr(rot13, '_is_text_encoding', True):
3324 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3325 self.assertRaises(TypeError, t.write, 'bar')
3326
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003327 def test_illegal_decoder(self):
3328 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003329 # Bypass the early encoding check added in issue 20404
3330 def _make_illegal_wrapper():
3331 quopri = codecs.lookup("quopri")
3332 quopri._is_text_encoding = True
3333 try:
3334 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3335 newline='\n', encoding="quopri")
3336 finally:
3337 quopri._is_text_encoding = False
3338 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003339 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003340 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003341 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003342 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003343 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003344 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003345 self.assertRaises(TypeError, t.read)
3346
Oren Milmanba7d7362017-08-29 11:58:27 +03003347 # Issue 31243: calling read() while the return value of decoder's
3348 # getstate() is invalid should neither crash the interpreter nor
3349 # raise a SystemError.
3350 def _make_very_illegal_wrapper(getstate_ret_val):
3351 class BadDecoder:
3352 def getstate(self):
3353 return getstate_ret_val
3354 def _get_bad_decoder(dummy):
3355 return BadDecoder()
3356 quopri = codecs.lookup("quopri")
3357 with support.swap_attr(quopri, 'incrementaldecoder',
3358 _get_bad_decoder):
3359 return _make_illegal_wrapper()
3360 t = _make_very_illegal_wrapper(42)
3361 self.assertRaises(TypeError, t.read, 42)
3362 t = _make_very_illegal_wrapper(())
3363 self.assertRaises(TypeError, t.read, 42)
3364 t = _make_very_illegal_wrapper((1, 2))
3365 self.assertRaises(TypeError, t.read, 42)
3366
Antoine Pitrou712cb732013-12-21 15:51:54 +01003367 def _check_create_at_shutdown(self, **kwargs):
3368 # Issue #20037: creating a TextIOWrapper at shutdown
3369 # shouldn't crash the interpreter.
3370 iomod = self.io.__name__
3371 code = """if 1:
3372 import codecs
3373 import {iomod} as io
3374
3375 # Avoid looking up codecs at shutdown
3376 codecs.lookup('utf-8')
3377
3378 class C:
3379 def __init__(self):
3380 self.buf = io.BytesIO()
3381 def __del__(self):
3382 io.TextIOWrapper(self.buf, **{kwargs})
3383 print("ok")
3384 c = C()
3385 """.format(iomod=iomod, kwargs=kwargs)
3386 return assert_python_ok("-c", code)
3387
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003388 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003389 def test_create_at_shutdown_without_encoding(self):
3390 rc, out, err = self._check_create_at_shutdown()
3391 if err:
3392 # Can error out with a RuntimeError if the module state
3393 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003394 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003395 else:
3396 self.assertEqual("ok", out.decode().strip())
3397
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003398 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003399 def test_create_at_shutdown_with_encoding(self):
3400 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3401 errors='strict')
3402 self.assertFalse(err)
3403 self.assertEqual("ok", out.decode().strip())
3404
Antoine Pitroub8503892014-04-29 10:14:02 +02003405 def test_read_byteslike(self):
3406 r = MemviewBytesIO(b'Just some random string\n')
3407 t = self.TextIOWrapper(r, 'utf-8')
3408
3409 # TextIOwrapper will not read the full string, because
3410 # we truncate it to a multiple of the native int size
3411 # so that we can construct a more complex memoryview.
3412 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3413
3414 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3415
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003416 def test_issue22849(self):
3417 class F(object):
3418 def readable(self): return True
3419 def writable(self): return True
3420 def seekable(self): return True
3421
3422 for i in range(10):
3423 try:
3424 self.TextIOWrapper(F(), encoding='utf-8')
3425 except Exception:
3426 pass
3427
3428 F.tell = lambda x: 0
3429 t = self.TextIOWrapper(F(), encoding='utf-8')
3430
INADA Naoki507434f2017-12-21 09:59:53 +09003431 def test_reconfigure_encoding_read(self):
3432 # latin1 -> utf8
3433 # (latin1 can decode utf-8 encoded string)
3434 data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
3435 raw = self.BytesIO(data)
3436 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3437 self.assertEqual(txt.readline(), 'abc\xe9\n')
3438 with self.assertRaises(self.UnsupportedOperation):
3439 txt.reconfigure(encoding='utf-8')
3440 with self.assertRaises(self.UnsupportedOperation):
3441 txt.reconfigure(newline=None)
3442
3443 def test_reconfigure_write_fromascii(self):
3444 # ascii has a specific encodefunc in the C implementation,
3445 # but utf-8-sig has not. Make sure that we get rid of the
3446 # cached encodefunc when we switch encoders.
3447 raw = self.BytesIO()
3448 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3449 txt.write('foo\n')
3450 txt.reconfigure(encoding='utf-8-sig')
3451 txt.write('\xe9\n')
3452 txt.flush()
3453 self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
3454
3455 def test_reconfigure_write(self):
3456 # latin -> utf8
3457 raw = self.BytesIO()
3458 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3459 txt.write('abc\xe9\n')
3460 txt.reconfigure(encoding='utf-8')
3461 self.assertEqual(raw.getvalue(), b'abc\xe9\n')
3462 txt.write('d\xe9f\n')
3463 txt.flush()
3464 self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
3465
3466 # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
3467 # the file
3468 raw = self.BytesIO()
3469 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3470 txt.write('abc\n')
3471 txt.reconfigure(encoding='utf-8-sig')
3472 txt.write('d\xe9f\n')
3473 txt.flush()
3474 self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
3475
3476 def test_reconfigure_write_non_seekable(self):
3477 raw = self.BytesIO()
3478 raw.seekable = lambda: False
3479 raw.seek = None
3480 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3481 txt.write('abc\n')
3482 txt.reconfigure(encoding='utf-8-sig')
3483 txt.write('d\xe9f\n')
3484 txt.flush()
3485
3486 # If the raw stream is not seekable, there'll be a BOM
3487 self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
3488
3489 def test_reconfigure_defaults(self):
3490 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
3491 txt.reconfigure(encoding=None)
3492 self.assertEqual(txt.encoding, 'ascii')
3493 self.assertEqual(txt.errors, 'replace')
3494 txt.write('LF\n')
3495
3496 txt.reconfigure(newline='\r\n')
3497 self.assertEqual(txt.encoding, 'ascii')
3498 self.assertEqual(txt.errors, 'replace')
3499
3500 txt.reconfigure(errors='ignore')
3501 self.assertEqual(txt.encoding, 'ascii')
3502 self.assertEqual(txt.errors, 'ignore')
3503 txt.write('CRLF\n')
3504
3505 txt.reconfigure(encoding='utf-8', newline=None)
3506 self.assertEqual(txt.errors, 'strict')
3507 txt.seek(0)
3508 self.assertEqual(txt.read(), 'LF\nCRLF\n')
3509
3510 self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
3511
3512 def test_reconfigure_newline(self):
3513 raw = self.BytesIO(b'CR\rEOF')
3514 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3515 txt.reconfigure(newline=None)
3516 self.assertEqual(txt.readline(), 'CR\n')
3517 raw = self.BytesIO(b'CR\rEOF')
3518 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3519 txt.reconfigure(newline='')
3520 self.assertEqual(txt.readline(), 'CR\r')
3521 raw = self.BytesIO(b'CR\rLF\nEOF')
3522 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3523 txt.reconfigure(newline='\n')
3524 self.assertEqual(txt.readline(), 'CR\rLF\n')
3525 raw = self.BytesIO(b'LF\nCR\rEOF')
3526 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3527 txt.reconfigure(newline='\r')
3528 self.assertEqual(txt.readline(), 'LF\nCR\r')
3529 raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
3530 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3531 txt.reconfigure(newline='\r\n')
3532 self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
3533
3534 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
3535 txt.reconfigure(newline=None)
3536 txt.write('linesep\n')
3537 txt.reconfigure(newline='')
3538 txt.write('LF\n')
3539 txt.reconfigure(newline='\n')
3540 txt.write('LF\n')
3541 txt.reconfigure(newline='\r')
3542 txt.write('CR\n')
3543 txt.reconfigure(newline='\r\n')
3544 txt.write('CRLF\n')
3545 expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
3546 self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
3547
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003548
Antoine Pitroub8503892014-04-29 10:14:02 +02003549class MemviewBytesIO(io.BytesIO):
3550 '''A BytesIO object whose read method returns memoryviews
3551 rather than bytes'''
3552
3553 def read1(self, len_):
3554 return _to_memoryview(super().read1(len_))
3555
3556 def read(self, len_):
3557 return _to_memoryview(super().read(len_))
3558
3559def _to_memoryview(buf):
3560 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3561
3562 arr = array.array('i')
3563 idx = len(buf) - len(buf) % arr.itemsize
3564 arr.frombytes(buf[:idx])
3565 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003566
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003567
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003568class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003569 io = io
Steve Dowerfe8f4c92016-09-07 09:31:52 -07003570 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003571
3572 def test_initialization(self):
3573 r = self.BytesIO(b"\xc3\xa9\n\n")
3574 b = self.BufferedReader(r, 1000)
3575 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003576 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3577 self.assertRaises(ValueError, t.read)
3578
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003579 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3580 self.assertRaises(Exception, repr, t)
3581
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003582 def test_garbage_collection(self):
3583 # C TextIOWrapper objects are collected, and collecting them flushes
3584 # all data to disk.
3585 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003586 with support.check_warnings(('', ResourceWarning)):
3587 rawio = io.FileIO(support.TESTFN, "wb")
3588 b = self.BufferedWriter(rawio)
3589 t = self.TextIOWrapper(b, encoding="ascii")
3590 t.write("456def")
3591 t.x = t
3592 wr = weakref.ref(t)
3593 del t
3594 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003595 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003596 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003597 self.assertEqual(f.read(), b"456def")
3598
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003599 def test_rwpair_cleared_before_textio(self):
3600 # Issue 13070: TextIOWrapper's finalization would crash when called
3601 # after the reference to the underlying BufferedRWPair's writer got
3602 # cleared by the GC.
3603 for i in range(1000):
3604 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3605 t1 = self.TextIOWrapper(b1, encoding="ascii")
3606 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3607 t2 = self.TextIOWrapper(b2, encoding="ascii")
3608 # circular references
3609 t1.buddy = t2
3610 t2.buddy = t1
3611 support.gc_collect()
3612
3613
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003614class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003615 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003616 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003617
3618
3619class IncrementalNewlineDecoderTest(unittest.TestCase):
3620
3621 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003622 # UTF-8 specific tests for a newline decoder
3623 def _check_decode(b, s, **kwargs):
3624 # We exercise getstate() / setstate() as well as decode()
3625 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003626 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003627 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003628 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003629
Antoine Pitrou180a3362008-12-14 16:36:46 +00003630 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003631
Antoine Pitrou180a3362008-12-14 16:36:46 +00003632 _check_decode(b'\xe8', "")
3633 _check_decode(b'\xa2', "")
3634 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003635
Antoine Pitrou180a3362008-12-14 16:36:46 +00003636 _check_decode(b'\xe8', "")
3637 _check_decode(b'\xa2', "")
3638 _check_decode(b'\x88', "\u8888")
3639
3640 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003641 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3642
Antoine Pitrou180a3362008-12-14 16:36:46 +00003643 decoder.reset()
3644 _check_decode(b'\n', "\n")
3645 _check_decode(b'\r', "")
3646 _check_decode(b'', "\n", final=True)
3647 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003648
Antoine Pitrou180a3362008-12-14 16:36:46 +00003649 _check_decode(b'\r', "")
3650 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003651
Antoine Pitrou180a3362008-12-14 16:36:46 +00003652 _check_decode(b'\r\r\n', "\n\n")
3653 _check_decode(b'\r', "")
3654 _check_decode(b'\r', "\n")
3655 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003656
Antoine Pitrou180a3362008-12-14 16:36:46 +00003657 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3658 _check_decode(b'\xe8\xa2\x88', "\u8888")
3659 _check_decode(b'\n', "\n")
3660 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3661 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003662
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003663 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003664 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003665 if encoding is not None:
3666 encoder = codecs.getincrementalencoder(encoding)()
3667 def _decode_bytewise(s):
3668 # Decode one byte at a time
3669 for b in encoder.encode(s):
3670 result.append(decoder.decode(bytes([b])))
3671 else:
3672 encoder = None
3673 def _decode_bytewise(s):
3674 # Decode one char at a time
3675 for c in s:
3676 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003677 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003678 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003679 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003680 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003681 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003682 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003683 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003684 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003685 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003686 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003687 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003688 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003689 input = "abc"
3690 if encoder is not None:
3691 encoder.reset()
3692 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003693 self.assertEqual(decoder.decode(input), "abc")
3694 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003695
3696 def test_newline_decoder(self):
3697 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003698 # None meaning the IncrementalNewlineDecoder takes unicode input
3699 # rather than bytes input
3700 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003701 'utf-16', 'utf-16-le', 'utf-16-be',
3702 'utf-32', 'utf-32-le', 'utf-32-be',
3703 )
3704 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003705 decoder = enc and codecs.getincrementaldecoder(enc)()
3706 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3707 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003708 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003709 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3710 self.check_newline_decoding_utf8(decoder)
Oren Milman1d1d3e92017-08-20 18:35:36 +03003711 self.assertRaises(TypeError, decoder.setstate, 42)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003712
Antoine Pitrou66913e22009-03-06 23:40:56 +00003713 def test_newline_bytes(self):
3714 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3715 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003716 self.assertEqual(dec.newlines, None)
3717 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3718 self.assertEqual(dec.newlines, None)
3719 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3720 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003721 dec = self.IncrementalNewlineDecoder(None, translate=False)
3722 _check(dec)
3723 dec = self.IncrementalNewlineDecoder(None, translate=True)
3724 _check(dec)
3725
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003726class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3727 pass
3728
3729class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3730 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003731
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003732
Guido van Rossum01a27522007-03-07 01:00:12 +00003733# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003734
Guido van Rossum5abbf752007-08-27 17:39:33 +00003735class MiscIOTest(unittest.TestCase):
3736
Barry Warsaw40e82462008-11-20 20:14:50 +00003737 def tearDown(self):
3738 support.unlink(support.TESTFN)
3739
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003740 def test___all__(self):
3741 for name in self.io.__all__:
3742 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003743 self.assertIsNotNone(obj, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003744 if name == "open":
3745 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003746 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003747 self.assertTrue(issubclass(obj, Exception), name)
3748 elif not name.startswith("SEEK_"):
3749 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003750
Barry Warsaw40e82462008-11-20 20:14:50 +00003751 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003752 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003753 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003754 f.close()
3755
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003756 with support.check_warnings(('', DeprecationWarning)):
3757 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003758 self.assertEqual(f.name, support.TESTFN)
3759 self.assertEqual(f.buffer.name, support.TESTFN)
3760 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3761 self.assertEqual(f.mode, "U")
3762 self.assertEqual(f.buffer.mode, "rb")
3763 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003764 f.close()
3765
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003766 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003767 self.assertEqual(f.mode, "w+")
3768 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3769 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003770
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003771 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003772 self.assertEqual(g.mode, "wb")
3773 self.assertEqual(g.raw.mode, "wb")
3774 self.assertEqual(g.name, f.fileno())
3775 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003776 f.close()
3777 g.close()
3778
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003779 def test_io_after_close(self):
3780 for kwargs in [
3781 {"mode": "w"},
3782 {"mode": "wb"},
3783 {"mode": "w", "buffering": 1},
3784 {"mode": "w", "buffering": 2},
3785 {"mode": "wb", "buffering": 0},
3786 {"mode": "r"},
3787 {"mode": "rb"},
3788 {"mode": "r", "buffering": 1},
3789 {"mode": "r", "buffering": 2},
3790 {"mode": "rb", "buffering": 0},
3791 {"mode": "w+"},
3792 {"mode": "w+b"},
3793 {"mode": "w+", "buffering": 1},
3794 {"mode": "w+", "buffering": 2},
3795 {"mode": "w+b", "buffering": 0},
3796 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003797 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003798 f.close()
3799 self.assertRaises(ValueError, f.flush)
3800 self.assertRaises(ValueError, f.fileno)
3801 self.assertRaises(ValueError, f.isatty)
3802 self.assertRaises(ValueError, f.__iter__)
3803 if hasattr(f, "peek"):
3804 self.assertRaises(ValueError, f.peek, 1)
3805 self.assertRaises(ValueError, f.read)
3806 if hasattr(f, "read1"):
3807 self.assertRaises(ValueError, f.read1, 1024)
Martin Panterccb2c0e2016-10-20 23:48:14 +00003808 self.assertRaises(ValueError, f.read1)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003809 if hasattr(f, "readall"):
3810 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003811 if hasattr(f, "readinto"):
3812 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003813 if hasattr(f, "readinto1"):
3814 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003815 self.assertRaises(ValueError, f.readline)
3816 self.assertRaises(ValueError, f.readlines)
Xiang Zhang026435c2017-04-15 12:47:28 +08003817 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003818 self.assertRaises(ValueError, f.seek, 0)
3819 self.assertRaises(ValueError, f.tell)
3820 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003821 self.assertRaises(ValueError, f.write,
3822 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003823 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003824 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003825
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003826 def test_blockingioerror(self):
3827 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003828 class C(str):
3829 pass
3830 c = C("")
3831 b = self.BlockingIOError(1, c)
3832 c.b = b
3833 b.c = c
3834 wr = weakref.ref(c)
3835 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003836 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003837 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003838
3839 def test_abcs(self):
3840 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003841 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3842 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3843 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3844 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003845
3846 def _check_abc_inheritance(self, abcmodule):
3847 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003848 self.assertIsInstance(f, abcmodule.IOBase)
3849 self.assertIsInstance(f, abcmodule.RawIOBase)
3850 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3851 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003852 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003853 self.assertIsInstance(f, abcmodule.IOBase)
3854 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3855 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3856 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003857 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003858 self.assertIsInstance(f, abcmodule.IOBase)
3859 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3860 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3861 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003862
3863 def test_abc_inheritance(self):
3864 # Test implementations inherit from their respective ABCs
3865 self._check_abc_inheritance(self)
3866
3867 def test_abc_inheritance_official(self):
3868 # Test implementations inherit from the official ABCs of the
3869 # baseline "io" module.
3870 self._check_abc_inheritance(io)
3871
Antoine Pitroue033e062010-10-29 10:38:18 +00003872 def _check_warn_on_dealloc(self, *args, **kwargs):
3873 f = open(*args, **kwargs)
3874 r = repr(f)
3875 with self.assertWarns(ResourceWarning) as cm:
3876 f = None
3877 support.gc_collect()
3878 self.assertIn(r, str(cm.warning.args[0]))
3879
3880 def test_warn_on_dealloc(self):
3881 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3882 self._check_warn_on_dealloc(support.TESTFN, "wb")
3883 self._check_warn_on_dealloc(support.TESTFN, "w")
3884
3885 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3886 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003887 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003888 for fd in fds:
3889 try:
3890 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003891 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003892 if e.errno != errno.EBADF:
3893 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003894 self.addCleanup(cleanup_fds)
3895 r, w = os.pipe()
3896 fds += r, w
3897 self._check_warn_on_dealloc(r, *args, **kwargs)
3898 # When using closefd=False, there's no warning
3899 r, w = os.pipe()
3900 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02003901 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00003902 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00003903
3904 def test_warn_on_dealloc_fd(self):
3905 self._check_warn_on_dealloc_fd("rb", buffering=0)
3906 self._check_warn_on_dealloc_fd("rb")
3907 self._check_warn_on_dealloc_fd("r")
3908
3909
Antoine Pitrou243757e2010-11-05 21:15:39 +00003910 def test_pickling(self):
3911 # Pickling file objects is forbidden
3912 for kwargs in [
3913 {"mode": "w"},
3914 {"mode": "wb"},
3915 {"mode": "wb", "buffering": 0},
3916 {"mode": "r"},
3917 {"mode": "rb"},
3918 {"mode": "rb", "buffering": 0},
3919 {"mode": "w+"},
3920 {"mode": "w+b"},
3921 {"mode": "w+b", "buffering": 0},
3922 ]:
3923 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3924 with self.open(support.TESTFN, **kwargs) as f:
3925 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3926
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003927 def test_nonblock_pipe_write_bigbuf(self):
3928 self._test_nonblock_pipe_write(16*1024)
3929
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003930 def test_nonblock_pipe_write_smallbuf(self):
3931 self._test_nonblock_pipe_write(1024)
3932
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003933 @unittest.skipUnless(hasattr(os, 'set_blocking'),
3934 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003935 def _test_nonblock_pipe_write(self, bufsize):
3936 sent = []
3937 received = []
3938 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003939 os.set_blocking(r, False)
3940 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003941
3942 # To exercise all code paths in the C implementation we need
3943 # to play with buffer sizes. For instance, if we choose a
3944 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3945 # then we will never get a partial write of the buffer.
3946 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3947 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3948
3949 with rf, wf:
3950 for N in 9999, 73, 7574:
3951 try:
3952 i = 0
3953 while True:
3954 msg = bytes([i % 26 + 97]) * N
3955 sent.append(msg)
3956 wf.write(msg)
3957 i += 1
3958
3959 except self.BlockingIOError as e:
3960 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003961 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003962 sent[-1] = sent[-1][:e.characters_written]
3963 received.append(rf.read())
3964 msg = b'BLOCKED'
3965 wf.write(msg)
3966 sent.append(msg)
3967
3968 while True:
3969 try:
3970 wf.flush()
3971 break
3972 except self.BlockingIOError as e:
3973 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003974 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003975 self.assertEqual(e.characters_written, 0)
3976 received.append(rf.read())
3977
3978 received += iter(rf.read, None)
3979
3980 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003981 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003982 self.assertTrue(wf.closed)
3983 self.assertTrue(rf.closed)
3984
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003985 def test_create_fail(self):
3986 # 'x' mode fails if file is existing
3987 with self.open(support.TESTFN, 'w'):
3988 pass
3989 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3990
3991 def test_create_writes(self):
3992 # 'x' mode opens for writing
3993 with self.open(support.TESTFN, 'xb') as f:
3994 f.write(b"spam")
3995 with self.open(support.TESTFN, 'rb') as f:
3996 self.assertEqual(b"spam", f.read())
3997
Christian Heimes7b648752012-09-10 14:48:43 +02003998 def test_open_allargs(self):
3999 # there used to be a buffer overflow in the parser for rawmode
4000 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
4001
4002
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004003class CMiscIOTest(MiscIOTest):
4004 io = io
4005
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03004006 def test_readinto_buffer_overflow(self):
4007 # Issue #18025
4008 class BadReader(self.io.BufferedIOBase):
4009 def read(self, n=-1):
4010 return b'x' * 10**6
4011 bufio = BadReader()
4012 b = bytearray(2)
4013 self.assertRaises(ValueError, bufio.readinto, b)
4014
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004015 def check_daemon_threads_shutdown_deadlock(self, stream_name):
4016 # Issue #23309: deadlocks at shutdown should be avoided when a
4017 # daemon thread and the main thread both write to a file.
4018 code = """if 1:
4019 import sys
4020 import time
4021 import threading
Victor Stinner2a1aed02017-04-21 17:59:23 +02004022 from test.support import SuppressCrashReport
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004023
4024 file = sys.{stream_name}
4025
4026 def run():
4027 while True:
4028 file.write('.')
4029 file.flush()
4030
Victor Stinner2a1aed02017-04-21 17:59:23 +02004031 crash = SuppressCrashReport()
4032 crash.__enter__()
4033 # don't call __exit__(): the crash occurs at Python shutdown
4034
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004035 thread = threading.Thread(target=run)
4036 thread.daemon = True
4037 thread.start()
4038
4039 time.sleep(0.5)
4040 file.write('!')
4041 file.flush()
4042 """.format_map(locals())
4043 res, _ = run_python_until_end("-c", code)
4044 err = res.err.decode()
4045 if res.rc != 0:
4046 # Failure: should be a fatal error
4047 self.assertIn("Fatal Python error: could not acquire lock "
4048 "for <_io.BufferedWriter name='<{stream_name}>'> "
4049 "at interpreter shutdown, possibly due to "
4050 "daemon threads".format_map(locals()),
4051 err)
4052 else:
4053 self.assertFalse(err.strip('.!'))
4054
4055 def test_daemon_threads_shutdown_stdout_deadlock(self):
4056 self.check_daemon_threads_shutdown_deadlock('stdout')
4057
4058 def test_daemon_threads_shutdown_stderr_deadlock(self):
4059 self.check_daemon_threads_shutdown_deadlock('stderr')
4060
4061
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004062class PyMiscIOTest(MiscIOTest):
4063 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00004064
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004065
4066@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
4067class SignalsTest(unittest.TestCase):
4068
4069 def setUp(self):
4070 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
4071
4072 def tearDown(self):
4073 signal.signal(signal.SIGALRM, self.oldalrm)
4074
4075 def alarm_interrupt(self, sig, frame):
4076 1/0
4077
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004078 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
4079 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00004080 invokes the signal handler, and bubbles up the exception raised
4081 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004082 read_results = []
4083 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02004084 if hasattr(signal, 'pthread_sigmask'):
4085 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004086 s = os.read(r, 1)
4087 read_results.append(s)
4088 t = threading.Thread(target=_read)
4089 t.daemon = True
4090 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00004091 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01004092 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004093 try:
4094 wio = self.io.open(w, **fdopen_kwargs)
4095 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004096 # Fill the pipe enough that the write will be blocking.
4097 # It will be interrupted by the timer armed above. Since the
4098 # other thread has read one byte, the low-level write will
4099 # return with a successful (partial) result rather than an EINTR.
4100 # The buffered IO layer must check for pending signal
4101 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02004102 signal.alarm(1)
4103 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01004104 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02004105 finally:
4106 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03004107 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004108 # We got one byte, get another one and check that it isn't a
4109 # repeat of the first one.
4110 read_results.append(os.read(r, 1))
4111 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
4112 finally:
4113 os.close(w)
4114 os.close(r)
4115 # This is deliberate. If we didn't close the file descriptor
4116 # before closing wio, wio would try to flush its internal
4117 # buffer, and block again.
4118 try:
4119 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004120 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004121 if e.errno != errno.EBADF:
4122 raise
4123
4124 def test_interrupted_write_unbuffered(self):
4125 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
4126
4127 def test_interrupted_write_buffered(self):
4128 self.check_interrupted_write(b"xy", b"xy", mode="wb")
4129
4130 def test_interrupted_write_text(self):
4131 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
4132
Brett Cannon31f59292011-02-21 19:29:56 +00004133 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004134 def check_reentrant_write(self, data, **fdopen_kwargs):
4135 def on_alarm(*args):
4136 # Will be called reentrantly from the same thread
4137 wio.write(data)
4138 1/0
4139 signal.signal(signal.SIGALRM, on_alarm)
4140 r, w = os.pipe()
4141 wio = self.io.open(w, **fdopen_kwargs)
4142 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004143 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004144 # Either the reentrant call to wio.write() fails with RuntimeError,
4145 # or the signal handler raises ZeroDivisionError.
4146 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4147 while 1:
4148 for i in range(100):
4149 wio.write(data)
4150 wio.flush()
4151 # Make sure the buffer doesn't fill up and block further writes
4152 os.read(r, len(data) * 100)
4153 exc = cm.exception
4154 if isinstance(exc, RuntimeError):
4155 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4156 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004157 signal.alarm(0)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004158 wio.close()
4159 os.close(r)
4160
4161 def test_reentrant_write_buffered(self):
4162 self.check_reentrant_write(b"xy", mode="wb")
4163
4164 def test_reentrant_write_text(self):
4165 self.check_reentrant_write("xy", mode="w", encoding="ascii")
4166
Antoine Pitrou707ce822011-02-25 21:24:11 +00004167 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4168 """Check that a buffered read, when it gets interrupted (either
4169 returning a partial result or EINTR), properly invokes the signal
4170 handler and retries if the latter returned successfully."""
4171 r, w = os.pipe()
4172 fdopen_kwargs["closefd"] = False
4173 def alarm_handler(sig, frame):
4174 os.write(w, b"bar")
4175 signal.signal(signal.SIGALRM, alarm_handler)
4176 try:
4177 rio = self.io.open(r, **fdopen_kwargs)
4178 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004179 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004180 # Expected behaviour:
4181 # - first raw read() returns partial b"foo"
4182 # - second raw read() returns EINTR
4183 # - third raw read() returns b"bar"
4184 self.assertEqual(decode(rio.read(6)), "foobar")
4185 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004186 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004187 rio.close()
4188 os.close(w)
4189 os.close(r)
4190
Antoine Pitrou20db5112011-08-19 20:32:34 +02004191 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004192 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4193 mode="rb")
4194
Antoine Pitrou20db5112011-08-19 20:32:34 +02004195 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004196 self.check_interrupted_read_retry(lambda x: x,
4197 mode="r")
4198
Antoine Pitrou707ce822011-02-25 21:24:11 +00004199 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4200 """Check that a buffered write, when it gets interrupted (either
4201 returning a partial result or EINTR), properly invokes the signal
4202 handler and retries if the latter returned successfully."""
4203 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004204
Antoine Pitrou707ce822011-02-25 21:24:11 +00004205 # A quantity that exceeds the buffer size of an anonymous pipe's
4206 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02004207 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00004208 r, w = os.pipe()
4209 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004210
Antoine Pitrou707ce822011-02-25 21:24:11 +00004211 # We need a separate thread to read from the pipe and allow the
4212 # write() to finish. This thread is started after the SIGALRM is
4213 # received (forcing a first EINTR in write()).
4214 read_results = []
4215 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004216 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00004217 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004218 try:
4219 while not write_finished:
4220 while r in select.select([r], [], [], 1.0)[0]:
4221 s = os.read(r, 1024)
4222 read_results.append(s)
4223 except BaseException as exc:
4224 nonlocal error
4225 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00004226 t = threading.Thread(target=_read)
4227 t.daemon = True
4228 def alarm1(sig, frame):
4229 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004230 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004231 def alarm2(sig, frame):
4232 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004233
4234 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00004235 signal.signal(signal.SIGALRM, alarm1)
4236 try:
4237 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004238 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004239 # Expected behaviour:
4240 # - first raw write() is partial (because of the limited pipe buffer
4241 # and the first alarm)
4242 # - second raw write() returns EINTR (because of the second alarm)
4243 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004244 written = wio.write(large_data)
4245 self.assertEqual(N, written)
4246
Antoine Pitrou707ce822011-02-25 21:24:11 +00004247 wio.flush()
4248 write_finished = True
4249 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004250
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004251 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004252 self.assertEqual(N, sum(len(x) for x in read_results))
4253 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004254 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004255 write_finished = True
4256 os.close(w)
4257 os.close(r)
4258 # This is deliberate. If we didn't close the file descriptor
4259 # before closing wio, wio would try to flush its internal
4260 # buffer, and could block (in case of failure).
4261 try:
4262 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004263 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00004264 if e.errno != errno.EBADF:
4265 raise
4266
Antoine Pitrou20db5112011-08-19 20:32:34 +02004267 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004268 self.check_interrupted_write_retry(b"x", mode="wb")
4269
Antoine Pitrou20db5112011-08-19 20:32:34 +02004270 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004271 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4272
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004273
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004274class CSignalsTest(SignalsTest):
4275 io = io
4276
4277class PySignalsTest(SignalsTest):
4278 io = pyio
4279
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004280 # Handling reentrancy issues would slow down _pyio even more, so the
4281 # tests are disabled.
4282 test_reentrant_write_buffered = None
4283 test_reentrant_write_text = None
4284
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004285
Ezio Melottidaa42c72013-03-23 16:30:16 +02004286def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07004287 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004288 CBufferedReaderTest, PyBufferedReaderTest,
4289 CBufferedWriterTest, PyBufferedWriterTest,
4290 CBufferedRWPairTest, PyBufferedRWPairTest,
4291 CBufferedRandomTest, PyBufferedRandomTest,
4292 StatefulIncrementalDecoderTest,
4293 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4294 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004295 CMiscIOTest, PyMiscIOTest,
4296 CSignalsTest, PySignalsTest,
4297 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004298
4299 # Put the namespaces of the IO module we are testing and some useful mock
4300 # classes in the __dict__ of each test.
4301 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
benfogle9703f092017-11-10 16:03:40 -05004302 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4303 SlowFlushRawIO)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004304 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4305 c_io_ns = {name : getattr(io, name) for name in all_members}
4306 py_io_ns = {name : getattr(pyio, name) for name in all_members}
4307 globs = globals()
4308 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4309 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4310 # Avoid turning open into a bound method.
4311 py_io_ns["open"] = pyio.OpenWrapper
4312 for test in tests:
4313 if test.__name__.startswith("C"):
4314 for name, obj in c_io_ns.items():
4315 setattr(test, name, obj)
4316 elif test.__name__.startswith("Py"):
4317 for name, obj in py_io_ns.items():
4318 setattr(test, name, obj)
4319
Ezio Melottidaa42c72013-03-23 16:30:16 +02004320 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4321 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004322
4323if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004324 unittest.main()