blob: 14352ff84fff9476805b57bd85ed33c70037f2be [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020031import threading
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000032import time
Guido van Rossum28524c72007-02-27 05:47:44 +000033import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000034import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020035import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020036from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020037from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000038from test import support
Berker Peksagce643912015-05-06 06:33:17 +030039from test.support.script_helper import assert_python_ok, run_python_until_end
Serhiy Storchakab21d1552018-03-02 11:53:51 +020040from test.support import FakePath
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000041
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000042import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000043import io # C implementation of io
44import _pyio as pyio # Python implementation of io
Guido van Rossuma9e20242007-03-08 00:43:48 +000045
Martin Panter6bb91f32016-05-28 00:41:57 +000046try:
47 import ctypes
48except ImportError:
49 def byteslike(*pos, **kw):
50 return array.array("b", bytes(*pos, **kw))
51else:
52 def byteslike(*pos, **kw):
53 """Create a bytes-like object having no string or sequence methods"""
54 data = bytes(*pos, **kw)
55 obj = EmptyStruct()
56 ctypes.resize(obj, len(data))
57 memoryview(obj).cast("B")[:] = data
58 return obj
59 class EmptyStruct(ctypes.Structure):
60 pass
61
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000062def _default_chunk_size():
63 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000064 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 return f._CHUNK_SIZE
66
67
Antoine Pitrou328ec742010-09-14 18:37:24 +000068class MockRawIOWithoutRead:
69 """A RawIO implementation without read(), so as to exercise the default
70 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000071
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000072 def __init__(self, read_stack=()):
73 self._read_stack = list(read_stack)
74 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000075 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000076 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000077
Guido van Rossum01a27522007-03-07 01:00:12 +000078 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000079 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000080 return len(b)
81
82 def writable(self):
83 return True
84
Guido van Rossum68bbcd22007-02-27 17:19:33 +000085 def fileno(self):
86 return 42
87
88 def readable(self):
89 return True
90
Guido van Rossum01a27522007-03-07 01:00:12 +000091 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000092 return True
93
Guido van Rossum01a27522007-03-07 01:00:12 +000094 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000095 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000096
97 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000098 return 0 # same comment as above
99
100 def readinto(self, buf):
101 self._reads += 1
102 max_len = len(buf)
103 try:
104 data = self._read_stack[0]
105 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000106 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000107 return 0
108 if data is None:
109 del self._read_stack[0]
110 return None
111 n = len(data)
112 if len(data) <= max_len:
113 del self._read_stack[0]
114 buf[:n] = data
115 return n
116 else:
117 buf[:] = data[:max_len]
118 self._read_stack[0] = data[max_len:]
119 return max_len
120
121 def truncate(self, pos=None):
122 return pos
123
Antoine Pitrou328ec742010-09-14 18:37:24 +0000124class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
125 pass
126
127class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
128 pass
129
130
131class MockRawIO(MockRawIOWithoutRead):
132
133 def read(self, n=None):
134 self._reads += 1
135 try:
136 return self._read_stack.pop(0)
137 except:
138 self._extraneous_reads += 1
139 return b""
140
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000141class CMockRawIO(MockRawIO, io.RawIOBase):
142 pass
143
144class PyMockRawIO(MockRawIO, pyio.RawIOBase):
145 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000146
Guido van Rossuma9e20242007-03-08 00:43:48 +0000147
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000148class MisbehavedRawIO(MockRawIO):
149 def write(self, b):
150 return super().write(b) * 2
151
152 def read(self, n=None):
153 return super().read(n) * 2
154
155 def seek(self, pos, whence):
156 return -123
157
158 def tell(self):
159 return -456
160
161 def readinto(self, buf):
162 super().readinto(buf)
163 return len(buf) * 5
164
165class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
166 pass
167
168class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
169 pass
170
171
benfogle9703f092017-11-10 16:03:40 -0500172class SlowFlushRawIO(MockRawIO):
173 def __init__(self):
174 super().__init__()
175 self.in_flush = threading.Event()
176
177 def flush(self):
178 self.in_flush.set()
179 time.sleep(0.25)
180
181class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
182 pass
183
184class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
185 pass
186
187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188class CloseFailureIO(MockRawIO):
189 closed = 0
190
191 def close(self):
192 if not self.closed:
193 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200194 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000195
196class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
197 pass
198
199class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
200 pass
201
202
203class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000204
205 def __init__(self, data):
206 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000207 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000208
209 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000210 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000211 self.read_history.append(None if res is None else len(res))
212 return res
213
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000214 def readinto(self, b):
215 res = super().readinto(b)
216 self.read_history.append(res)
217 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000218
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000219class CMockFileIO(MockFileIO, io.BytesIO):
220 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000221
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000222class PyMockFileIO(MockFileIO, pyio.BytesIO):
223 pass
224
225
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000226class MockUnseekableIO:
227 def seekable(self):
228 return False
229
230 def seek(self, *args):
231 raise self.UnsupportedOperation("not seekable")
232
233 def tell(self, *args):
234 raise self.UnsupportedOperation("not seekable")
235
Martin Panter754aab22016-03-31 07:21:56 +0000236 def truncate(self, *args):
237 raise self.UnsupportedOperation("not seekable")
238
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000239class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
240 UnsupportedOperation = io.UnsupportedOperation
241
242class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
243 UnsupportedOperation = pyio.UnsupportedOperation
244
245
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000246class MockNonBlockWriterIO:
247
248 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000249 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000250 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000251
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000252 def pop_written(self):
253 s = b"".join(self._write_stack)
254 self._write_stack[:] = []
255 return s
256
257 def block_on(self, char):
258 """Block when a given char is encountered."""
259 self._blocker_char = char
260
261 def readable(self):
262 return True
263
264 def seekable(self):
265 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000266
Guido van Rossum01a27522007-03-07 01:00:12 +0000267 def writable(self):
268 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000269
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000270 def write(self, b):
271 b = bytes(b)
272 n = -1
273 if self._blocker_char:
274 try:
275 n = b.index(self._blocker_char)
276 except ValueError:
277 pass
278 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100279 if n > 0:
280 # write data up to the first blocker
281 self._write_stack.append(b[:n])
282 return n
283 else:
284 # cancel blocker and indicate would block
285 self._blocker_char = None
286 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000287 self._write_stack.append(b)
288 return len(b)
289
290class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
291 BlockingIOError = io.BlockingIOError
292
293class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
294 BlockingIOError = pyio.BlockingIOError
295
Guido van Rossuma9e20242007-03-08 00:43:48 +0000296
Guido van Rossum28524c72007-02-27 05:47:44 +0000297class IOTest(unittest.TestCase):
298
Neal Norwitze7789b12008-03-24 06:18:09 +0000299 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000300 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000301
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000302 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000303 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000304
Guido van Rossum28524c72007-02-27 05:47:44 +0000305 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000306 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000307 f.truncate(0)
308 self.assertEqual(f.tell(), 5)
309 f.seek(0)
310
311 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000312 self.assertEqual(f.seek(0), 0)
313 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000314 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000315 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000316 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000317 buffer = bytearray(b" world\n\n\n")
318 self.assertEqual(f.write(buffer), 9)
319 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000320 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000321 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000322 self.assertEqual(f.seek(-1, 2), 13)
323 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000324
Guido van Rossum87429772007-04-10 21:06:59 +0000325 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000326 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000327 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000328
Guido van Rossum9b76da62007-04-11 01:09:03 +0000329 def read_ops(self, f, buffered=False):
330 data = f.read(5)
331 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000332 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000333 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000334 self.assertEqual(bytes(data), b" worl")
335 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000336 self.assertEqual(f.readinto(data), 2)
337 self.assertEqual(len(data), 5)
338 self.assertEqual(data[:2], b"d\n")
339 self.assertEqual(f.seek(0), 0)
340 self.assertEqual(f.read(20), b"hello world\n")
341 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000342 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000343 self.assertEqual(f.seek(-6, 2), 6)
344 self.assertEqual(f.read(5), b"world")
345 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000346 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000347 self.assertEqual(f.seek(-6, 1), 5)
348 self.assertEqual(f.read(5), b" worl")
349 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000350 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000351 if buffered:
352 f.seek(0)
353 self.assertEqual(f.read(), b"hello world\n")
354 f.seek(6)
355 self.assertEqual(f.read(), b"world\n")
356 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000357 f.seek(0)
358 data = byteslike(5)
359 self.assertEqual(f.readinto1(data), 5)
360 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000361
Guido van Rossum34d69e52007-04-10 20:08:41 +0000362 LARGE = 2**31
363
Guido van Rossum53807da2007-04-10 19:01:47 +0000364 def large_file_ops(self, f):
365 assert f.readable()
366 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100367 try:
368 self.assertEqual(f.seek(self.LARGE), self.LARGE)
369 except (OverflowError, ValueError):
370 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000371 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000372 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000373 self.assertEqual(f.tell(), self.LARGE + 3)
374 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000375 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000376 self.assertEqual(f.tell(), self.LARGE + 2)
377 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000378 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000379 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000380 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
381 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000382 self.assertEqual(f.read(2), b"x")
383
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000384 def test_invalid_operations(self):
385 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000386 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000387 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000388 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000389 self.assertRaises(exc, fp.read)
390 self.assertRaises(exc, fp.readline)
391 with self.open(support.TESTFN, "wb", buffering=0) as fp:
392 self.assertRaises(exc, fp.read)
393 self.assertRaises(exc, fp.readline)
394 with self.open(support.TESTFN, "rb", buffering=0) as fp:
395 self.assertRaises(exc, fp.write, b"blah")
396 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000397 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000398 self.assertRaises(exc, fp.write, b"blah")
399 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000400 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000401 self.assertRaises(exc, fp.write, "blah")
402 self.assertRaises(exc, fp.writelines, ["blah\n"])
403 # Non-zero seeking from current or end pos
404 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
405 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000406
Martin Panter754aab22016-03-31 07:21:56 +0000407 def test_optional_abilities(self):
408 # Test for OSError when optional APIs are not supported
409 # The purpose of this test is to try fileno(), reading, writing and
410 # seeking operations with various objects that indicate they do not
411 # support these operations.
412
413 def pipe_reader():
414 [r, w] = os.pipe()
415 os.close(w) # So that read() is harmless
416 return self.FileIO(r, "r")
417
418 def pipe_writer():
419 [r, w] = os.pipe()
420 self.addCleanup(os.close, r)
421 # Guarantee that we can write into the pipe without blocking
422 thread = threading.Thread(target=os.read, args=(r, 100))
423 thread.start()
424 self.addCleanup(thread.join)
425 return self.FileIO(w, "w")
426
427 def buffered_reader():
428 return self.BufferedReader(self.MockUnseekableIO())
429
430 def buffered_writer():
431 return self.BufferedWriter(self.MockUnseekableIO())
432
433 def buffered_random():
434 return self.BufferedRandom(self.BytesIO())
435
436 def buffered_rw_pair():
437 return self.BufferedRWPair(self.MockUnseekableIO(),
438 self.MockUnseekableIO())
439
440 def text_reader():
441 class UnseekableReader(self.MockUnseekableIO):
442 writable = self.BufferedIOBase.writable
443 write = self.BufferedIOBase.write
444 return self.TextIOWrapper(UnseekableReader(), "ascii")
445
446 def text_writer():
447 class UnseekableWriter(self.MockUnseekableIO):
448 readable = self.BufferedIOBase.readable
449 read = self.BufferedIOBase.read
450 return self.TextIOWrapper(UnseekableWriter(), "ascii")
451
452 tests = (
453 (pipe_reader, "fr"), (pipe_writer, "fw"),
454 (buffered_reader, "r"), (buffered_writer, "w"),
455 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
456 (text_reader, "r"), (text_writer, "w"),
457 (self.BytesIO, "rws"), (self.StringIO, "rws"),
458 )
459 for [test, abilities] in tests:
Martin Panter754aab22016-03-31 07:21:56 +0000460 with self.subTest(test), test() as obj:
461 readable = "r" in abilities
462 self.assertEqual(obj.readable(), readable)
463 writable = "w" in abilities
464 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000465
466 if isinstance(obj, self.TextIOBase):
467 data = "3"
468 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
469 data = b"3"
470 else:
471 self.fail("Unknown base class")
472
473 if "f" in abilities:
474 obj.fileno()
475 else:
476 self.assertRaises(OSError, obj.fileno)
477
478 if readable:
479 obj.read(1)
480 obj.read()
481 else:
482 self.assertRaises(OSError, obj.read, 1)
483 self.assertRaises(OSError, obj.read)
484
485 if writable:
486 obj.write(data)
487 else:
488 self.assertRaises(OSError, obj.write, data)
489
Martin Panter3ee147f2016-03-31 21:05:31 +0000490 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000491 pipe_reader, pipe_writer):
492 # Pipes seem to appear as seekable on Windows
493 continue
494 seekable = "s" in abilities
495 self.assertEqual(obj.seekable(), seekable)
496
Martin Panter754aab22016-03-31 07:21:56 +0000497 if seekable:
498 obj.tell()
499 obj.seek(0)
500 else:
501 self.assertRaises(OSError, obj.tell)
502 self.assertRaises(OSError, obj.seek, 0)
503
504 if writable and seekable:
505 obj.truncate()
506 obj.truncate(0)
507 else:
508 self.assertRaises(OSError, obj.truncate)
509 self.assertRaises(OSError, obj.truncate, 0)
510
Antoine Pitrou13348842012-01-29 18:36:34 +0100511 def test_open_handles_NUL_chars(self):
512 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300513 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100514
515 bytes_fn = bytes(fn_with_NUL, 'ascii')
516 with warnings.catch_warnings():
517 warnings.simplefilter("ignore", DeprecationWarning)
518 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100519
Guido van Rossum28524c72007-02-27 05:47:44 +0000520 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000521 with self.open(support.TESTFN, "wb", buffering=0) as f:
522 self.assertEqual(f.readable(), False)
523 self.assertEqual(f.writable(), True)
524 self.assertEqual(f.seekable(), True)
525 self.write_ops(f)
526 with self.open(support.TESTFN, "rb", buffering=0) as f:
527 self.assertEqual(f.readable(), True)
528 self.assertEqual(f.writable(), False)
529 self.assertEqual(f.seekable(), True)
530 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000531
Guido van Rossum87429772007-04-10 21:06:59 +0000532 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000533 with self.open(support.TESTFN, "wb") as f:
534 self.assertEqual(f.readable(), False)
535 self.assertEqual(f.writable(), True)
536 self.assertEqual(f.seekable(), True)
537 self.write_ops(f)
538 with self.open(support.TESTFN, "rb") as f:
539 self.assertEqual(f.readable(), True)
540 self.assertEqual(f.writable(), False)
541 self.assertEqual(f.seekable(), True)
542 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000543
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000544 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000545 with self.open(support.TESTFN, "wb") as f:
546 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
547 with self.open(support.TESTFN, "rb") as f:
548 self.assertEqual(f.readline(), b"abc\n")
549 self.assertEqual(f.readline(10), b"def\n")
550 self.assertEqual(f.readline(2), b"xy")
551 self.assertEqual(f.readline(4), b"zzy\n")
552 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000553 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000554 self.assertRaises(TypeError, f.readline, 5.3)
555 with self.open(support.TESTFN, "r") as f:
556 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000557
Serhiy Storchakabf623ae2017-04-19 20:03:52 +0300558 def test_readline_nonsizeable(self):
559 # Issue #30061
560 # Crash when readline() returns an object without __len__
561 class R(self.IOBase):
562 def readline(self):
563 return None
564 self.assertRaises((TypeError, StopIteration), next, R())
565
566 def test_next_nonsizeable(self):
567 # Issue #30061
568 # Crash when __next__() returns an object without __len__
569 class R(self.IOBase):
570 def __next__(self):
571 return None
572 self.assertRaises(TypeError, R().readlines, 1)
573
Guido van Rossum28524c72007-02-27 05:47:44 +0000574 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000575 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000576 self.write_ops(f)
577 data = f.getvalue()
578 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000579 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000580 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000581
Guido van Rossum53807da2007-04-10 19:01:47 +0000582 def test_large_file_ops(self):
Mike53f7a7c2017-12-14 14:04:53 +0300583 # On Windows and Mac OSX this test consumes large resources; It takes
Victor Stinner8c663fd2017-11-08 14:44:44 -0800584 # a long time to build the >2 GiB file and takes >2 GiB of disk space
Guido van Rossum34d69e52007-04-10 20:08:41 +0000585 # therefore the resource must be enabled to run this test.
Victor Stinner937ee9e2018-06-26 02:11:06 +0200586 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600587 support.requires(
588 'largefile',
589 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000590 with self.open(support.TESTFN, "w+b", 0) as f:
591 self.large_file_ops(f)
592 with self.open(support.TESTFN, "w+b") as f:
593 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000594
595 def test_with_open(self):
Alexey Izbysheva2670562018-10-20 03:22:31 +0300596 for bufsize in (0, 100):
Guido van Rossum87429772007-04-10 21:06:59 +0000597 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000598 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000599 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000600 self.assertEqual(f.closed, True)
601 f = None
602 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000603 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000604 1/0
605 except ZeroDivisionError:
606 self.assertEqual(f.closed, True)
607 else:
608 self.fail("1/0 didn't raise an exception")
609
Antoine Pitrou08838b62009-01-21 00:55:13 +0000610 # issue 5008
611 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000612 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000613 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000614 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000615 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000616 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000617 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000618 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300619 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000620
Guido van Rossum87429772007-04-10 21:06:59 +0000621 def test_destructor(self):
622 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000623 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000624 def __del__(self):
625 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000626 try:
627 f = super().__del__
628 except AttributeError:
629 pass
630 else:
631 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000632 def close(self):
633 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000634 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000635 def flush(self):
636 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000637 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000638 with support.check_warnings(('', ResourceWarning)):
639 f = MyFileIO(support.TESTFN, "wb")
640 f.write(b"xxx")
641 del f
642 support.gc_collect()
643 self.assertEqual(record, [1, 2, 3])
644 with self.open(support.TESTFN, "rb") as f:
645 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000646
647 def _check_base_destructor(self, base):
648 record = []
649 class MyIO(base):
650 def __init__(self):
651 # This exercises the availability of attributes on object
652 # destruction.
653 # (in the C version, close() is called by the tp_dealloc
654 # function, not by __del__)
655 self.on_del = 1
656 self.on_close = 2
657 self.on_flush = 3
658 def __del__(self):
659 record.append(self.on_del)
660 try:
661 f = super().__del__
662 except AttributeError:
663 pass
664 else:
665 f()
666 def close(self):
667 record.append(self.on_close)
668 super().close()
669 def flush(self):
670 record.append(self.on_flush)
671 super().flush()
672 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000673 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000674 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000675 self.assertEqual(record, [1, 2, 3])
676
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000677 def test_IOBase_destructor(self):
678 self._check_base_destructor(self.IOBase)
679
680 def test_RawIOBase_destructor(self):
681 self._check_base_destructor(self.RawIOBase)
682
683 def test_BufferedIOBase_destructor(self):
684 self._check_base_destructor(self.BufferedIOBase)
685
686 def test_TextIOBase_destructor(self):
687 self._check_base_destructor(self.TextIOBase)
688
Guido van Rossum87429772007-04-10 21:06:59 +0000689 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000690 with self.open(support.TESTFN, "wb") as f:
691 f.write(b"xxx")
692 with self.open(support.TESTFN, "rb") as f:
693 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000694
Guido van Rossumd4103952007-04-12 05:44:49 +0000695 def test_array_writes(self):
696 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000697 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000698 def check(f):
699 with f:
700 self.assertEqual(f.write(a), n)
701 f.writelines((a,))
702 check(self.BytesIO())
703 check(self.FileIO(support.TESTFN, "w"))
704 check(self.BufferedWriter(self.MockRawIO()))
705 check(self.BufferedRandom(self.MockRawIO()))
706 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000707
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000708 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000709 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000710 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000711
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000712 def test_read_closed(self):
713 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000714 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000715 with self.open(support.TESTFN, "r") as f:
716 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000717 self.assertEqual(file.read(), "egg\n")
718 file.seek(0)
719 file.close()
720 self.assertRaises(ValueError, file.read)
721
722 def test_no_closefd_with_filename(self):
723 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000724 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000725
726 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000727 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000728 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000729 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000730 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000731 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000732 self.assertEqual(file.buffer.raw.closefd, False)
733
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000734 def test_garbage_collection(self):
735 # FileIO objects are collected, and collecting them flushes
736 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000737 with support.check_warnings(('', ResourceWarning)):
738 f = self.FileIO(support.TESTFN, "wb")
739 f.write(b"abcxxx")
740 f.f = f
741 wr = weakref.ref(f)
742 del f
743 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300744 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000745 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000746 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000747
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000748 def test_unbounded_file(self):
749 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
750 zero = "/dev/zero"
751 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000752 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000753 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000754 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000755 if support.real_max_memuse < support._2G:
Victor Stinner8c663fd2017-11-08 14:44:44 -0800756 self.skipTest("test requires at least 2 GiB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000757 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000758 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000759 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000760 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000761 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000762 self.assertRaises(OverflowError, f.read)
763
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200764 def check_flush_error_on_close(self, *args, **kwargs):
765 # Test that the file is closed despite failed flush
766 # and that flush() is called before file closed.
767 f = self.open(*args, **kwargs)
768 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000769 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200770 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200771 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000772 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200773 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600774 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200775 self.assertTrue(closed) # flush() called
776 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200777 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200778
779 def test_flush_error_on_close(self):
780 # raw file
781 # Issue #5700: io.FileIO calls flush() after file closed
782 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
783 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
784 self.check_flush_error_on_close(fd, 'wb', buffering=0)
785 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
786 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
787 os.close(fd)
788 # buffered io
789 self.check_flush_error_on_close(support.TESTFN, 'wb')
790 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
791 self.check_flush_error_on_close(fd, 'wb')
792 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
793 self.check_flush_error_on_close(fd, 'wb', closefd=False)
794 os.close(fd)
795 # text io
796 self.check_flush_error_on_close(support.TESTFN, 'w')
797 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
798 self.check_flush_error_on_close(fd, 'w')
799 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
800 self.check_flush_error_on_close(fd, 'w', closefd=False)
801 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000802
803 def test_multi_close(self):
804 f = self.open(support.TESTFN, "wb", buffering=0)
805 f.close()
806 f.close()
807 f.close()
808 self.assertRaises(ValueError, f.flush)
809
Antoine Pitrou328ec742010-09-14 18:37:24 +0000810 def test_RawIOBase_read(self):
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530811 # Exercise the default limited RawIOBase.read(n) implementation (which
812 # calls readinto() internally).
Antoine Pitrou328ec742010-09-14 18:37:24 +0000813 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
814 self.assertEqual(rawio.read(2), b"ab")
815 self.assertEqual(rawio.read(2), b"c")
816 self.assertEqual(rawio.read(2), b"d")
817 self.assertEqual(rawio.read(2), None)
818 self.assertEqual(rawio.read(2), b"ef")
819 self.assertEqual(rawio.read(2), b"g")
820 self.assertEqual(rawio.read(2), None)
821 self.assertEqual(rawio.read(2), b"")
822
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400823 def test_types_have_dict(self):
824 test = (
825 self.IOBase(),
826 self.RawIOBase(),
827 self.TextIOBase(),
828 self.StringIO(),
829 self.BytesIO()
830 )
831 for obj in test:
832 self.assertTrue(hasattr(obj, "__dict__"))
833
Ross Lagerwall59142db2011-10-31 20:34:46 +0200834 def test_opener(self):
835 with self.open(support.TESTFN, "w") as f:
836 f.write("egg\n")
837 fd = os.open(support.TESTFN, os.O_RDONLY)
838 def opener(path, flags):
839 return fd
840 with self.open("non-existent", "r", opener=opener) as f:
841 self.assertEqual(f.read(), "egg\n")
842
Barry Warsaw480e2852016-06-08 17:47:26 -0400843 def test_bad_opener_negative_1(self):
844 # Issue #27066.
845 def badopener(fname, flags):
846 return -1
847 with self.assertRaises(ValueError) as cm:
848 open('non-existent', 'r', opener=badopener)
849 self.assertEqual(str(cm.exception), 'opener returned -1')
850
851 def test_bad_opener_other_negative(self):
852 # Issue #27066.
853 def badopener(fname, flags):
854 return -2
855 with self.assertRaises(ValueError) as cm:
856 open('non-existent', 'r', opener=badopener)
857 self.assertEqual(str(cm.exception), 'opener returned -2')
858
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200859 def test_fileio_closefd(self):
860 # Issue #4841
861 with self.open(__file__, 'rb') as f1, \
862 self.open(__file__, 'rb') as f2:
863 fileio = self.FileIO(f1.fileno(), closefd=False)
864 # .__init__() must not close f1
865 fileio.__init__(f2.fileno(), closefd=False)
866 f1.readline()
867 # .close() must not close f2
868 fileio.close()
869 f2.readline()
870
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300871 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200872 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300873 with self.assertRaises(ValueError):
874 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300875
876 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200877 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300878 with self.assertRaises(ValueError):
879 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300880
Martin Panter6bb91f32016-05-28 00:41:57 +0000881 def test_buffered_readinto_mixin(self):
882 # Test the implementation provided by BufferedIOBase
883 class Stream(self.BufferedIOBase):
884 def read(self, size):
885 return b"12345"
886 read1 = read
887 stream = Stream()
888 for method in ("readinto", "readinto1"):
889 with self.subTest(method):
890 buffer = byteslike(5)
891 self.assertEqual(getattr(stream, method)(buffer), 5)
892 self.assertEqual(bytes(buffer), b"12345")
893
Ethan Furmand62548a2016-06-04 14:38:43 -0700894 def test_fspath_support(self):
Ethan Furmand62548a2016-06-04 14:38:43 -0700895 def check_path_succeeds(path):
896 with self.open(path, "w") as f:
897 f.write("egg\n")
898
899 with self.open(path, "r") as f:
900 self.assertEqual(f.read(), "egg\n")
901
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200902 check_path_succeeds(FakePath(support.TESTFN))
903 check_path_succeeds(FakePath(support.TESTFN.encode('utf-8')))
Ethan Furmand62548a2016-06-04 14:38:43 -0700904
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200905 with self.open(support.TESTFN, "w") as f:
906 bad_path = FakePath(f.fileno())
907 with self.assertRaises(TypeError):
908 self.open(bad_path, 'w')
909
910 bad_path = FakePath(None)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700911 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700912 self.open(bad_path, 'w')
913
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200914 bad_path = FakePath(FloatingPointError)
915 with self.assertRaises(FloatingPointError):
916 self.open(bad_path, 'w')
917
Ethan Furmand62548a2016-06-04 14:38:43 -0700918 # ensure that refcounting is correct with some error conditions
919 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200920 self.open(FakePath(support.TESTFN), 'rwxa')
Ethan Furmand62548a2016-06-04 14:38:43 -0700921
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530922 def test_RawIOBase_readall(self):
923 # Exercise the default unlimited RawIOBase.read() and readall()
924 # implementations.
925 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
926 self.assertEqual(rawio.read(), b"abcdefg")
927 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
928 self.assertEqual(rawio.readall(), b"abcdefg")
929
930 def test_BufferedIOBase_readinto(self):
931 # Exercise the default BufferedIOBase.readinto() and readinto1()
932 # implementations (which call read() or read1() internally).
933 class Reader(self.BufferedIOBase):
934 def __init__(self, avail):
935 self.avail = avail
936 def read(self, size):
937 result = self.avail[:size]
938 self.avail = self.avail[size:]
939 return result
940 def read1(self, size):
941 """Returns no more than 5 bytes at once"""
942 return self.read(min(size, 5))
943 tests = (
944 # (test method, total data available, read buffer size, expected
945 # read size)
946 ("readinto", 10, 5, 5),
947 ("readinto", 10, 6, 6), # More than read1() can return
948 ("readinto", 5, 6, 5), # Buffer larger than total available
949 ("readinto", 6, 7, 6),
950 ("readinto", 10, 0, 0), # Empty buffer
951 ("readinto1", 10, 5, 5), # Result limited to single read1() call
952 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return
953 ("readinto1", 5, 6, 5), # Buffer larger than total available
954 ("readinto1", 6, 7, 5),
955 ("readinto1", 10, 0, 0), # Empty buffer
956 )
957 UNUSED_BYTE = 0x81
958 for test in tests:
959 with self.subTest(test):
960 method, avail, request, result = test
961 reader = Reader(bytes(range(avail)))
962 buffer = bytearray((UNUSED_BYTE,) * request)
963 method = getattr(reader, method)
964 self.assertEqual(method(buffer), result)
965 self.assertEqual(len(buffer), request)
966 self.assertSequenceEqual(buffer[:result], range(result))
967 unused = (UNUSED_BYTE,) * (request - result)
968 self.assertSequenceEqual(buffer[result:], unused)
969 self.assertEqual(len(reader.avail), avail - result)
970
Zackery Spytz28f07362018-07-17 00:31:44 -0600971 def test_close_assert(self):
972 class R(self.IOBase):
973 def __setattr__(self, name, value):
974 pass
975 def flush(self):
976 raise OSError()
977 f = R()
978 # This would cause an assertion failure.
979 self.assertRaises(OSError, f.close)
980
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200981
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000982class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200983
984 def test_IOBase_finalize(self):
985 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
986 # class which inherits IOBase and an object of this class are caught
987 # in a reference cycle and close() is already in the method cache.
988 class MyIO(self.IOBase):
989 def close(self):
990 pass
991
992 # create an instance to populate the method cache
993 MyIO()
994 obj = MyIO()
995 obj.obj = obj
996 wr = weakref.ref(obj)
997 del MyIO
998 del obj
999 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001000 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +00001001
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001002class PyIOTest(IOTest):
1003 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001004
Guido van Rossuma9e20242007-03-08 00:43:48 +00001005
Gregory P. Smith1bef9072015-04-14 13:24:34 -07001006@support.cpython_only
1007class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -07001008
Gregory P. Smith054b0652015-04-14 12:58:05 -07001009 def test_RawIOBase_io_in_pyio_match(self):
1010 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +02001011 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
1012 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -07001013 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1014
1015 def test_RawIOBase_pyio_in_io_match(self):
1016 """Test that c RawIOBase class has all pyio RawIOBase methods"""
1017 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1018 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1019
1020
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001021class CommonBufferedTests:
1022 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1023
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001024 def test_detach(self):
1025 raw = self.MockRawIO()
1026 buf = self.tp(raw)
1027 self.assertIs(buf.detach(), raw)
1028 self.assertRaises(ValueError, buf.detach)
1029
Benjamin Peterson10e76b62014-12-21 20:51:50 -06001030 repr(buf) # Should still work
1031
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001032 def test_fileno(self):
1033 rawio = self.MockRawIO()
1034 bufio = self.tp(rawio)
1035
Ezio Melottib3aedd42010-11-20 19:04:17 +00001036 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001037
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001038 def test_invalid_args(self):
1039 rawio = self.MockRawIO()
1040 bufio = self.tp(rawio)
1041 # Invalid whence
1042 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +02001043 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001044
1045 def test_override_destructor(self):
1046 tp = self.tp
1047 record = []
1048 class MyBufferedIO(tp):
1049 def __del__(self):
1050 record.append(1)
1051 try:
1052 f = super().__del__
1053 except AttributeError:
1054 pass
1055 else:
1056 f()
1057 def close(self):
1058 record.append(2)
1059 super().close()
1060 def flush(self):
1061 record.append(3)
1062 super().flush()
1063 rawio = self.MockRawIO()
1064 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001065 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001066 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +00001067 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001068
1069 def test_context_manager(self):
1070 # Test usability as a context manager
1071 rawio = self.MockRawIO()
1072 bufio = self.tp(rawio)
1073 def _with():
1074 with bufio:
1075 pass
1076 _with()
1077 # bufio should now be closed, and using it a second time should raise
1078 # a ValueError.
1079 self.assertRaises(ValueError, _with)
1080
1081 def test_error_through_destructor(self):
1082 # Test that the exception state is not modified by a destructor,
1083 # even if close() fails.
1084 rawio = self.CloseFailureIO()
1085 def f():
1086 self.tp(rawio).xyzzy
1087 with support.captured_output("stderr") as s:
1088 self.assertRaises(AttributeError, f)
1089 s = s.getvalue().strip()
1090 if s:
1091 # The destructor *may* have printed an unraisable error, check it
1092 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001093 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001094 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +00001095
Antoine Pitrou716c4442009-05-23 19:04:03 +00001096 def test_repr(self):
1097 raw = self.MockRawIO()
1098 b = self.tp(raw)
Serhiy Storchaka521e5862014-07-22 15:00:37 +03001099 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001100 self.assertEqual(repr(b), "<%s>" % clsname)
1101 raw.name = "dummy"
1102 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
1103 raw.name = b"dummy"
1104 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
1105
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02001106 def test_recursive_repr(self):
1107 # Issue #25455
1108 raw = self.MockRawIO()
1109 b = self.tp(raw)
1110 with support.swap_attr(raw, 'name', b):
1111 try:
1112 repr(b) # Should not crash
1113 except RuntimeError:
1114 pass
1115
Antoine Pitrou6be88762010-05-03 16:48:20 +00001116 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001117 # Test that buffered file is closed despite failed flush
1118 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001119 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001120 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001121 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001122 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001123 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001124 raw.flush = bad_flush
1125 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001126 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001127 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001128 self.assertTrue(raw.closed)
1129 self.assertTrue(closed) # flush() called
1130 self.assertFalse(closed[0]) # flush() called before file closed
1131 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001132 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001133
1134 def test_close_error_on_close(self):
1135 raw = self.MockRawIO()
1136 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001137 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001138 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001139 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001140 raw.close = bad_close
1141 b = self.tp(raw)
1142 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001143 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001144 b.close()
1145 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001146 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001147 self.assertEqual(err.exception.__context__.args, ('flush',))
1148 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001149
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001150 def test_nonnormalized_close_error_on_close(self):
1151 # Issue #21677
1152 raw = self.MockRawIO()
1153 def bad_flush():
1154 raise non_existing_flush
1155 def bad_close():
1156 raise non_existing_close
1157 raw.close = bad_close
1158 b = self.tp(raw)
1159 b.flush = bad_flush
1160 with self.assertRaises(NameError) as err: # exception not swallowed
1161 b.close()
1162 self.assertIn('non_existing_close', str(err.exception))
1163 self.assertIsInstance(err.exception.__context__, NameError)
1164 self.assertIn('non_existing_flush', str(err.exception.__context__))
1165 self.assertFalse(b.closed)
1166
Antoine Pitrou6be88762010-05-03 16:48:20 +00001167 def test_multi_close(self):
1168 raw = self.MockRawIO()
1169 b = self.tp(raw)
1170 b.close()
1171 b.close()
1172 b.close()
1173 self.assertRaises(ValueError, b.flush)
1174
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001175 def test_unseekable(self):
1176 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1177 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1178 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1179
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001180 def test_readonly_attributes(self):
1181 raw = self.MockRawIO()
1182 buf = self.tp(raw)
1183 x = self.MockRawIO()
1184 with self.assertRaises(AttributeError):
1185 buf.raw = x
1186
Guido van Rossum78892e42007-04-06 17:31:18 +00001187
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001188class SizeofTest:
1189
1190 @support.cpython_only
1191 def test_sizeof(self):
1192 bufsize1 = 4096
1193 bufsize2 = 8192
1194 rawio = self.MockRawIO()
1195 bufio = self.tp(rawio, buffer_size=bufsize1)
1196 size = sys.getsizeof(bufio) - bufsize1
1197 rawio = self.MockRawIO()
1198 bufio = self.tp(rawio, buffer_size=bufsize2)
1199 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1200
Jesus Ceadc469452012-10-04 12:37:56 +02001201 @support.cpython_only
1202 def test_buffer_freeing(self) :
1203 bufsize = 4096
1204 rawio = self.MockRawIO()
1205 bufio = self.tp(rawio, buffer_size=bufsize)
1206 size = sys.getsizeof(bufio) - bufsize
1207 bufio.close()
1208 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001209
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001210class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1211 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001212
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001213 def test_constructor(self):
1214 rawio = self.MockRawIO([b"abc"])
1215 bufio = self.tp(rawio)
1216 bufio.__init__(rawio)
1217 bufio.__init__(rawio, buffer_size=1024)
1218 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001219 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001220 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1221 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1222 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1223 rawio = self.MockRawIO([b"abc"])
1224 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001225 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001226
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001227 def test_uninitialized(self):
1228 bufio = self.tp.__new__(self.tp)
1229 del bufio
1230 bufio = self.tp.__new__(self.tp)
1231 self.assertRaisesRegex((ValueError, AttributeError),
1232 'uninitialized|has no attribute',
1233 bufio.read, 0)
1234 bufio.__init__(self.MockRawIO())
1235 self.assertEqual(bufio.read(0), b'')
1236
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001237 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001238 for arg in (None, 7):
1239 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1240 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001241 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001242 # Invalid args
1243 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001244
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001245 def test_read1(self):
1246 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1247 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001248 self.assertEqual(b"a", bufio.read(1))
1249 self.assertEqual(b"b", bufio.read1(1))
1250 self.assertEqual(rawio._reads, 1)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001251 self.assertEqual(b"", bufio.read1(0))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001252 self.assertEqual(b"c", bufio.read1(100))
1253 self.assertEqual(rawio._reads, 1)
1254 self.assertEqual(b"d", bufio.read1(100))
1255 self.assertEqual(rawio._reads, 2)
1256 self.assertEqual(b"efg", bufio.read1(100))
1257 self.assertEqual(rawio._reads, 3)
1258 self.assertEqual(b"", bufio.read1(100))
1259 self.assertEqual(rawio._reads, 4)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001260
1261 def test_read1_arbitrary(self):
1262 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1263 bufio = self.tp(rawio)
1264 self.assertEqual(b"a", bufio.read(1))
1265 self.assertEqual(b"bc", bufio.read1())
1266 self.assertEqual(b"d", bufio.read1())
1267 self.assertEqual(b"efg", bufio.read1(-1))
1268 self.assertEqual(rawio._reads, 3)
1269 self.assertEqual(b"", bufio.read1())
1270 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001271
1272 def test_readinto(self):
1273 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1274 bufio = self.tp(rawio)
1275 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001276 self.assertEqual(bufio.readinto(b), 2)
1277 self.assertEqual(b, b"ab")
1278 self.assertEqual(bufio.readinto(b), 2)
1279 self.assertEqual(b, b"cd")
1280 self.assertEqual(bufio.readinto(b), 2)
1281 self.assertEqual(b, b"ef")
1282 self.assertEqual(bufio.readinto(b), 1)
1283 self.assertEqual(b, b"gf")
1284 self.assertEqual(bufio.readinto(b), 0)
1285 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001286 rawio = self.MockRawIO((b"abc", None))
1287 bufio = self.tp(rawio)
1288 self.assertEqual(bufio.readinto(b), 2)
1289 self.assertEqual(b, b"ab")
1290 self.assertEqual(bufio.readinto(b), 1)
1291 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001292
Benjamin Petersona96fea02014-06-22 14:17:44 -07001293 def test_readinto1(self):
1294 buffer_size = 10
1295 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1296 bufio = self.tp(rawio, buffer_size=buffer_size)
1297 b = bytearray(2)
1298 self.assertEqual(bufio.peek(3), b'abc')
1299 self.assertEqual(rawio._reads, 1)
1300 self.assertEqual(bufio.readinto1(b), 2)
1301 self.assertEqual(b, b"ab")
1302 self.assertEqual(rawio._reads, 1)
1303 self.assertEqual(bufio.readinto1(b), 1)
1304 self.assertEqual(b[:1], b"c")
1305 self.assertEqual(rawio._reads, 1)
1306 self.assertEqual(bufio.readinto1(b), 2)
1307 self.assertEqual(b, b"de")
1308 self.assertEqual(rawio._reads, 2)
1309 b = bytearray(2*buffer_size)
1310 self.assertEqual(bufio.peek(3), b'fgh')
1311 self.assertEqual(rawio._reads, 3)
1312 self.assertEqual(bufio.readinto1(b), 6)
1313 self.assertEqual(b[:6], b"fghjkl")
1314 self.assertEqual(rawio._reads, 4)
1315
1316 def test_readinto_array(self):
1317 buffer_size = 60
1318 data = b"a" * 26
1319 rawio = self.MockRawIO((data,))
1320 bufio = self.tp(rawio, buffer_size=buffer_size)
1321
1322 # Create an array with element size > 1 byte
1323 b = array.array('i', b'x' * 32)
1324 assert len(b) != 16
1325
1326 # Read into it. We should get as many *bytes* as we can fit into b
1327 # (which is more than the number of elements)
1328 n = bufio.readinto(b)
1329 self.assertGreater(n, len(b))
1330
1331 # Check that old contents of b are preserved
1332 bm = memoryview(b).cast('B')
1333 self.assertLess(n, len(bm))
1334 self.assertEqual(bm[:n], data[:n])
1335 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1336
1337 def test_readinto1_array(self):
1338 buffer_size = 60
1339 data = b"a" * 26
1340 rawio = self.MockRawIO((data,))
1341 bufio = self.tp(rawio, buffer_size=buffer_size)
1342
1343 # Create an array with element size > 1 byte
1344 b = array.array('i', b'x' * 32)
1345 assert len(b) != 16
1346
1347 # Read into it. We should get as many *bytes* as we can fit into b
1348 # (which is more than the number of elements)
1349 n = bufio.readinto1(b)
1350 self.assertGreater(n, len(b))
1351
1352 # Check that old contents of b are preserved
1353 bm = memoryview(b).cast('B')
1354 self.assertLess(n, len(bm))
1355 self.assertEqual(bm[:n], data[:n])
1356 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1357
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001358 def test_readlines(self):
1359 def bufio():
1360 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1361 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001362 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1363 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1364 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001365
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001366 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001367 data = b"abcdefghi"
1368 dlen = len(data)
1369
1370 tests = [
1371 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1372 [ 100, [ 3, 3, 3], [ dlen ] ],
1373 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1374 ]
1375
1376 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001377 rawio = self.MockFileIO(data)
1378 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001379 pos = 0
1380 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001381 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001382 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001383 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001384 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001385
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001386 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001387 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001388 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1389 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001390 self.assertEqual(b"abcd", bufio.read(6))
1391 self.assertEqual(b"e", bufio.read(1))
1392 self.assertEqual(b"fg", bufio.read())
1393 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001394 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001395 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001396
Victor Stinnera80987f2011-05-25 22:47:16 +02001397 rawio = self.MockRawIO((b"a", None, None))
1398 self.assertEqual(b"a", rawio.readall())
1399 self.assertIsNone(rawio.readall())
1400
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001401 def test_read_past_eof(self):
1402 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1403 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001404
Ezio Melottib3aedd42010-11-20 19:04:17 +00001405 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001406
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001407 def test_read_all(self):
1408 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1409 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001410
Ezio Melottib3aedd42010-11-20 19:04:17 +00001411 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001412
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001413 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001414 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001415 try:
1416 # Write out many bytes with exactly the same number of 0's,
1417 # 1's... 255's. This will help us check that concurrent reading
1418 # doesn't duplicate or forget contents.
1419 N = 1000
1420 l = list(range(256)) * N
1421 random.shuffle(l)
1422 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001423 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001424 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001425 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001426 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001427 errors = []
1428 results = []
1429 def f():
1430 try:
1431 # Intra-buffer read then buffer-flushing read
1432 for n in cycle([1, 19]):
1433 s = bufio.read(n)
1434 if not s:
1435 break
1436 # list.append() is atomic
1437 results.append(s)
1438 except Exception as e:
1439 errors.append(e)
1440 raise
1441 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001442 with support.start_threads(threads):
1443 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001444 self.assertFalse(errors,
1445 "the following exceptions were caught: %r" % errors)
1446 s = b''.join(results)
1447 for i in range(256):
1448 c = bytes(bytearray([i]))
1449 self.assertEqual(s.count(c), N)
1450 finally:
1451 support.unlink(support.TESTFN)
1452
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001453 def test_unseekable(self):
1454 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1455 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1456 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1457 bufio.read(1)
1458 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1459 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1460
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001461 def test_misbehaved_io(self):
1462 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1463 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001464 self.assertRaises(OSError, bufio.seek, 0)
1465 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001466
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001467 def test_no_extraneous_read(self):
1468 # Issue #9550; when the raw IO object has satisfied the read request,
1469 # we should not issue any additional reads, otherwise it may block
1470 # (e.g. socket).
1471 bufsize = 16
1472 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1473 rawio = self.MockRawIO([b"x" * n])
1474 bufio = self.tp(rawio, bufsize)
1475 self.assertEqual(bufio.read(n), b"x" * n)
1476 # Simple case: one raw read is enough to satisfy the request.
1477 self.assertEqual(rawio._extraneous_reads, 0,
1478 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1479 # A more complex case where two raw reads are needed to satisfy
1480 # the request.
1481 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1482 bufio = self.tp(rawio, bufsize)
1483 self.assertEqual(bufio.read(n), b"x" * n)
1484 self.assertEqual(rawio._extraneous_reads, 0,
1485 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1486
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001487 def test_read_on_closed(self):
1488 # Issue #23796
1489 b = io.BufferedReader(io.BytesIO(b"12"))
1490 b.read(1)
1491 b.close()
1492 self.assertRaises(ValueError, b.peek)
1493 self.assertRaises(ValueError, b.read1, 1)
1494
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001495
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001496class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001497 tp = io.BufferedReader
1498
1499 def test_constructor(self):
1500 BufferedReaderTest.test_constructor(self)
1501 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001502 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001503 if sys.maxsize > 0x7FFFFFFF:
1504 rawio = self.MockRawIO()
1505 bufio = self.tp(rawio)
1506 self.assertRaises((OverflowError, MemoryError, ValueError),
1507 bufio.__init__, rawio, sys.maxsize)
1508
1509 def test_initialization(self):
1510 rawio = self.MockRawIO([b"abc"])
1511 bufio = self.tp(rawio)
1512 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1513 self.assertRaises(ValueError, bufio.read)
1514 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1515 self.assertRaises(ValueError, bufio.read)
1516 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1517 self.assertRaises(ValueError, bufio.read)
1518
1519 def test_misbehaved_io_read(self):
1520 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1521 bufio = self.tp(rawio)
1522 # _pyio.BufferedReader seems to implement reading different, so that
1523 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001524 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001525
1526 def test_garbage_collection(self):
1527 # C BufferedReader objects are collected.
1528 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001529 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou796564c2013-07-30 19:59:21 +02001530 with support.check_warnings(('', ResourceWarning)):
1531 rawio = self.FileIO(support.TESTFN, "w+b")
1532 f = self.tp(rawio)
1533 f.f = f
1534 wr = weakref.ref(f)
1535 del f
1536 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001537 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001538
R David Murray67bfe802013-02-23 21:51:05 -05001539 def test_args_error(self):
1540 # Issue #17275
1541 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1542 self.tp(io.BytesIO(), 1024, 1024, 1024)
1543
1544
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001545class PyBufferedReaderTest(BufferedReaderTest):
1546 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001547
Guido van Rossuma9e20242007-03-08 00:43:48 +00001548
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001549class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1550 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001551
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001552 def test_constructor(self):
1553 rawio = self.MockRawIO()
1554 bufio = self.tp(rawio)
1555 bufio.__init__(rawio)
1556 bufio.__init__(rawio, buffer_size=1024)
1557 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001558 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001559 bufio.flush()
1560 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1561 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1562 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1563 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001564 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001565 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001566 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001567
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001568 def test_uninitialized(self):
1569 bufio = self.tp.__new__(self.tp)
1570 del bufio
1571 bufio = self.tp.__new__(self.tp)
1572 self.assertRaisesRegex((ValueError, AttributeError),
1573 'uninitialized|has no attribute',
1574 bufio.write, b'')
1575 bufio.__init__(self.MockRawIO())
1576 self.assertEqual(bufio.write(b''), 0)
1577
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001578 def test_detach_flush(self):
1579 raw = self.MockRawIO()
1580 buf = self.tp(raw)
1581 buf.write(b"howdy!")
1582 self.assertFalse(raw._write_stack)
1583 buf.detach()
1584 self.assertEqual(raw._write_stack, [b"howdy!"])
1585
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001586 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001587 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001588 writer = self.MockRawIO()
1589 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001590 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001591 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001592 buffer = bytearray(b"def")
1593 bufio.write(buffer)
1594 buffer[:] = b"***" # Overwrite our copy of the data
1595 bufio.flush()
1596 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001597
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001598 def test_write_overflow(self):
1599 writer = self.MockRawIO()
1600 bufio = self.tp(writer, 8)
1601 contents = b"abcdefghijklmnop"
1602 for n in range(0, len(contents), 3):
1603 bufio.write(contents[n:n+3])
1604 flushed = b"".join(writer._write_stack)
1605 # At least (total - 8) bytes were implicitly flushed, perhaps more
1606 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001607 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001608
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001609 def check_writes(self, intermediate_func):
1610 # Lots of writes, test the flushed output is as expected.
1611 contents = bytes(range(256)) * 1000
1612 n = 0
1613 writer = self.MockRawIO()
1614 bufio = self.tp(writer, 13)
1615 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1616 def gen_sizes():
1617 for size in count(1):
1618 for i in range(15):
1619 yield size
1620 sizes = gen_sizes()
1621 while n < len(contents):
1622 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001623 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001624 intermediate_func(bufio)
1625 n += size
1626 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001627 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001628
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001629 def test_writes(self):
1630 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001631
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001632 def test_writes_and_flushes(self):
1633 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001634
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001635 def test_writes_and_seeks(self):
1636 def _seekabs(bufio):
1637 pos = bufio.tell()
1638 bufio.seek(pos + 1, 0)
1639 bufio.seek(pos - 1, 0)
1640 bufio.seek(pos, 0)
1641 self.check_writes(_seekabs)
1642 def _seekrel(bufio):
1643 pos = bufio.seek(0, 1)
1644 bufio.seek(+1, 1)
1645 bufio.seek(-1, 1)
1646 bufio.seek(pos, 0)
1647 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001648
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001649 def test_writes_and_truncates(self):
1650 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001651
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001652 def test_write_non_blocking(self):
1653 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001654 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001655
Ezio Melottib3aedd42010-11-20 19:04:17 +00001656 self.assertEqual(bufio.write(b"abcd"), 4)
1657 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001658 # 1 byte will be written, the rest will be buffered
1659 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001660 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001661
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001662 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1663 raw.block_on(b"0")
1664 try:
1665 bufio.write(b"opqrwxyz0123456789")
1666 except self.BlockingIOError as e:
1667 written = e.characters_written
1668 else:
1669 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001670 self.assertEqual(written, 16)
1671 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001672 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001673
Ezio Melottib3aedd42010-11-20 19:04:17 +00001674 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001675 s = raw.pop_written()
1676 # Previously buffered bytes were flushed
1677 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001678
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001679 def test_write_and_rewind(self):
1680 raw = io.BytesIO()
1681 bufio = self.tp(raw, 4)
1682 self.assertEqual(bufio.write(b"abcdef"), 6)
1683 self.assertEqual(bufio.tell(), 6)
1684 bufio.seek(0, 0)
1685 self.assertEqual(bufio.write(b"XY"), 2)
1686 bufio.seek(6, 0)
1687 self.assertEqual(raw.getvalue(), b"XYcdef")
1688 self.assertEqual(bufio.write(b"123456"), 6)
1689 bufio.flush()
1690 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001691
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001692 def test_flush(self):
1693 writer = self.MockRawIO()
1694 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001695 bufio.write(b"abc")
1696 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001697 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001698
Antoine Pitrou131a4892012-10-16 22:57:11 +02001699 def test_writelines(self):
1700 l = [b'ab', b'cd', b'ef']
1701 writer = self.MockRawIO()
1702 bufio = self.tp(writer, 8)
1703 bufio.writelines(l)
1704 bufio.flush()
1705 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1706
1707 def test_writelines_userlist(self):
1708 l = UserList([b'ab', b'cd', b'ef'])
1709 writer = self.MockRawIO()
1710 bufio = self.tp(writer, 8)
1711 bufio.writelines(l)
1712 bufio.flush()
1713 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1714
1715 def test_writelines_error(self):
1716 writer = self.MockRawIO()
1717 bufio = self.tp(writer, 8)
1718 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1719 self.assertRaises(TypeError, bufio.writelines, None)
1720 self.assertRaises(TypeError, bufio.writelines, 'abc')
1721
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001722 def test_destructor(self):
1723 writer = self.MockRawIO()
1724 bufio = self.tp(writer, 8)
1725 bufio.write(b"abc")
1726 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001727 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001728 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001729
1730 def test_truncate(self):
1731 # Truncate implicitly flushes the buffer.
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001732 self.addCleanup(support.unlink, support.TESTFN)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001733 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001734 bufio = self.tp(raw, 8)
1735 bufio.write(b"abcdef")
1736 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001737 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001738 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001739 self.assertEqual(f.read(), b"abc")
1740
Nitish Chandra059f58c2018-01-28 21:30:09 +05301741 def test_truncate_after_write(self):
1742 # Ensure that truncate preserves the file position after
1743 # writes longer than the buffer size.
1744 # Issue: https://bugs.python.org/issue32228
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001745 self.addCleanup(support.unlink, support.TESTFN)
Nitish Chandra059f58c2018-01-28 21:30:09 +05301746 with self.open(support.TESTFN, "wb") as f:
1747 # Fill with some buffer
1748 f.write(b'\x00' * 10000)
1749 buffer_sizes = [8192, 4096, 200]
1750 for buffer_size in buffer_sizes:
1751 with self.open(support.TESTFN, "r+b", buffering=buffer_size) as f:
1752 f.write(b'\x00' * (buffer_size + 1))
1753 # After write write_pos and write_end are set to 0
1754 f.read(1)
1755 # read operation makes sure that pos != raw_pos
1756 f.truncate()
1757 self.assertEqual(f.tell(), buffer_size + 2)
1758
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001759 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001760 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001761 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001762 # Write out many bytes from many threads and test they were
1763 # all flushed.
1764 N = 1000
1765 contents = bytes(range(256)) * N
1766 sizes = cycle([1, 19])
1767 n = 0
1768 queue = deque()
1769 while n < len(contents):
1770 size = next(sizes)
1771 queue.append(contents[n:n+size])
1772 n += size
1773 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001774 # We use a real file object because it allows us to
1775 # exercise situations where the GIL is released before
1776 # writing the buffer to the raw streams. This is in addition
1777 # to concurrency issues due to switching threads in the middle
1778 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001779 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001780 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001781 errors = []
1782 def f():
1783 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001784 while True:
1785 try:
1786 s = queue.popleft()
1787 except IndexError:
1788 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001789 bufio.write(s)
1790 except Exception as e:
1791 errors.append(e)
1792 raise
1793 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001794 with support.start_threads(threads):
1795 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001796 self.assertFalse(errors,
1797 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001798 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001799 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001800 s = f.read()
1801 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001802 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001803 finally:
1804 support.unlink(support.TESTFN)
1805
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001806 def test_misbehaved_io(self):
1807 rawio = self.MisbehavedRawIO()
1808 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001809 self.assertRaises(OSError, bufio.seek, 0)
1810 self.assertRaises(OSError, bufio.tell)
1811 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001812
Florent Xicluna109d5732012-07-07 17:03:22 +02001813 def test_max_buffer_size_removal(self):
1814 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001815 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001816
Benjamin Peterson68623612012-12-20 11:53:11 -06001817 def test_write_error_on_close(self):
1818 raw = self.MockRawIO()
1819 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001820 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001821 raw.write = bad_write
1822 b = self.tp(raw)
1823 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001824 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001825 self.assertTrue(b.closed)
1826
benfogle9703f092017-11-10 16:03:40 -05001827 def test_slow_close_from_thread(self):
1828 # Issue #31976
1829 rawio = self.SlowFlushRawIO()
1830 bufio = self.tp(rawio, 8)
1831 t = threading.Thread(target=bufio.close)
1832 t.start()
1833 rawio.in_flush.wait()
1834 self.assertRaises(ValueError, bufio.write, b'spam')
1835 self.assertTrue(bufio.closed)
1836 t.join()
1837
1838
Benjamin Peterson59406a92009-03-26 17:10:29 +00001839
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001840class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001841 tp = io.BufferedWriter
1842
1843 def test_constructor(self):
1844 BufferedWriterTest.test_constructor(self)
1845 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001846 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001847 if sys.maxsize > 0x7FFFFFFF:
1848 rawio = self.MockRawIO()
1849 bufio = self.tp(rawio)
1850 self.assertRaises((OverflowError, MemoryError, ValueError),
1851 bufio.__init__, rawio, sys.maxsize)
1852
1853 def test_initialization(self):
1854 rawio = self.MockRawIO()
1855 bufio = self.tp(rawio)
1856 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1857 self.assertRaises(ValueError, bufio.write, b"def")
1858 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1859 self.assertRaises(ValueError, bufio.write, b"def")
1860 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1861 self.assertRaises(ValueError, bufio.write, b"def")
1862
1863 def test_garbage_collection(self):
1864 # C BufferedWriter objects are collected, and collecting them flushes
1865 # all data to disk.
1866 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001867 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou796564c2013-07-30 19:59:21 +02001868 with support.check_warnings(('', ResourceWarning)):
1869 rawio = self.FileIO(support.TESTFN, "w+b")
1870 f = self.tp(rawio)
1871 f.write(b"123xxx")
1872 f.x = f
1873 wr = weakref.ref(f)
1874 del f
1875 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001876 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001877 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001878 self.assertEqual(f.read(), b"123xxx")
1879
R David Murray67bfe802013-02-23 21:51:05 -05001880 def test_args_error(self):
1881 # Issue #17275
1882 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1883 self.tp(io.BytesIO(), 1024, 1024, 1024)
1884
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001885
1886class PyBufferedWriterTest(BufferedWriterTest):
1887 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001888
Guido van Rossum01a27522007-03-07 01:00:12 +00001889class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001890
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001891 def test_constructor(self):
1892 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001893 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001894
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001895 def test_uninitialized(self):
1896 pair = self.tp.__new__(self.tp)
1897 del pair
1898 pair = self.tp.__new__(self.tp)
1899 self.assertRaisesRegex((ValueError, AttributeError),
1900 'uninitialized|has no attribute',
1901 pair.read, 0)
1902 self.assertRaisesRegex((ValueError, AttributeError),
1903 'uninitialized|has no attribute',
1904 pair.write, b'')
1905 pair.__init__(self.MockRawIO(), self.MockRawIO())
1906 self.assertEqual(pair.read(0), b'')
1907 self.assertEqual(pair.write(b''), 0)
1908
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001909 def test_detach(self):
1910 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1911 self.assertRaises(self.UnsupportedOperation, pair.detach)
1912
Florent Xicluna109d5732012-07-07 17:03:22 +02001913 def test_constructor_max_buffer_size_removal(self):
1914 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001915 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001916
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001917 def test_constructor_with_not_readable(self):
1918 class NotReadable(MockRawIO):
1919 def readable(self):
1920 return False
1921
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001922 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001923
1924 def test_constructor_with_not_writeable(self):
1925 class NotWriteable(MockRawIO):
1926 def writable(self):
1927 return False
1928
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001929 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001930
1931 def test_read(self):
1932 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1933
1934 self.assertEqual(pair.read(3), b"abc")
1935 self.assertEqual(pair.read(1), b"d")
1936 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001937 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1938 self.assertEqual(pair.read(None), b"abc")
1939
1940 def test_readlines(self):
1941 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1942 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1943 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1944 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001945
1946 def test_read1(self):
1947 # .read1() is delegated to the underlying reader object, so this test
1948 # can be shallow.
1949 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1950
1951 self.assertEqual(pair.read1(3), b"abc")
Martin Panterccb2c0e2016-10-20 23:48:14 +00001952 self.assertEqual(pair.read1(), b"def")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001953
1954 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00001955 for method in ("readinto", "readinto1"):
1956 with self.subTest(method):
1957 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001958
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03001959 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00001960 self.assertEqual(getattr(pair, method)(data), 5)
1961 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001962
1963 def test_write(self):
1964 w = self.MockRawIO()
1965 pair = self.tp(self.MockRawIO(), w)
1966
1967 pair.write(b"abc")
1968 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00001969 buffer = bytearray(b"def")
1970 pair.write(buffer)
1971 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001972 pair.flush()
1973 self.assertEqual(w._write_stack, [b"abc", b"def"])
1974
1975 def test_peek(self):
1976 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1977
1978 self.assertTrue(pair.peek(3).startswith(b"abc"))
1979 self.assertEqual(pair.read(3), b"abc")
1980
1981 def test_readable(self):
1982 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1983 self.assertTrue(pair.readable())
1984
1985 def test_writeable(self):
1986 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1987 self.assertTrue(pair.writable())
1988
1989 def test_seekable(self):
1990 # BufferedRWPairs are never seekable, even if their readers and writers
1991 # are.
1992 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1993 self.assertFalse(pair.seekable())
1994
1995 # .flush() is delegated to the underlying writer object and has been
1996 # tested in the test_write method.
1997
1998 def test_close_and_closed(self):
1999 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2000 self.assertFalse(pair.closed)
2001 pair.close()
2002 self.assertTrue(pair.closed)
2003
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002004 def test_reader_close_error_on_close(self):
2005 def reader_close():
2006 reader_non_existing
2007 reader = self.MockRawIO()
2008 reader.close = reader_close
2009 writer = self.MockRawIO()
2010 pair = self.tp(reader, writer)
2011 with self.assertRaises(NameError) as err:
2012 pair.close()
2013 self.assertIn('reader_non_existing', str(err.exception))
2014 self.assertTrue(pair.closed)
2015 self.assertFalse(reader.closed)
2016 self.assertTrue(writer.closed)
2017
2018 def test_writer_close_error_on_close(self):
2019 def writer_close():
2020 writer_non_existing
2021 reader = self.MockRawIO()
2022 writer = self.MockRawIO()
2023 writer.close = writer_close
2024 pair = self.tp(reader, writer)
2025 with self.assertRaises(NameError) as err:
2026 pair.close()
2027 self.assertIn('writer_non_existing', str(err.exception))
2028 self.assertFalse(pair.closed)
2029 self.assertTrue(reader.closed)
2030 self.assertFalse(writer.closed)
2031
2032 def test_reader_writer_close_error_on_close(self):
2033 def reader_close():
2034 reader_non_existing
2035 def writer_close():
2036 writer_non_existing
2037 reader = self.MockRawIO()
2038 reader.close = reader_close
2039 writer = self.MockRawIO()
2040 writer.close = writer_close
2041 pair = self.tp(reader, writer)
2042 with self.assertRaises(NameError) as err:
2043 pair.close()
2044 self.assertIn('reader_non_existing', str(err.exception))
2045 self.assertIsInstance(err.exception.__context__, NameError)
2046 self.assertIn('writer_non_existing', str(err.exception.__context__))
2047 self.assertFalse(pair.closed)
2048 self.assertFalse(reader.closed)
2049 self.assertFalse(writer.closed)
2050
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002051 def test_isatty(self):
2052 class SelectableIsAtty(MockRawIO):
2053 def __init__(self, isatty):
2054 MockRawIO.__init__(self)
2055 self._isatty = isatty
2056
2057 def isatty(self):
2058 return self._isatty
2059
2060 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2061 self.assertFalse(pair.isatty())
2062
2063 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2064 self.assertTrue(pair.isatty())
2065
2066 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2067 self.assertTrue(pair.isatty())
2068
2069 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2070 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00002071
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04002072 def test_weakref_clearing(self):
2073 brw = self.tp(self.MockRawIO(), self.MockRawIO())
2074 ref = weakref.ref(brw)
2075 brw = None
2076 ref = None # Shouldn't segfault.
2077
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002078class CBufferedRWPairTest(BufferedRWPairTest):
2079 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002080
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002081class PyBufferedRWPairTest(BufferedRWPairTest):
2082 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002083
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002084
2085class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2086 read_mode = "rb+"
2087 write_mode = "wb+"
2088
2089 def test_constructor(self):
2090 BufferedReaderTest.test_constructor(self)
2091 BufferedWriterTest.test_constructor(self)
2092
Serhiy Storchaka61e24932014-02-12 10:52:35 +02002093 def test_uninitialized(self):
2094 BufferedReaderTest.test_uninitialized(self)
2095 BufferedWriterTest.test_uninitialized(self)
2096
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002097 def test_read_and_write(self):
2098 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00002099 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00002100
2101 self.assertEqual(b"as", rw.read(2))
2102 rw.write(b"ddd")
2103 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002104 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002105 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002106 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00002107
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002108 def test_seek_and_tell(self):
2109 raw = self.BytesIO(b"asdfghjkl")
2110 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00002111
Ezio Melottib3aedd42010-11-20 19:04:17 +00002112 self.assertEqual(b"as", rw.read(2))
2113 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002114 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002115 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00002116
Antoine Pitroue05565e2011-08-20 14:39:23 +02002117 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00002118 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02002119 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002120 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002121 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002122 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002123 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002124 self.assertEqual(7, rw.tell())
2125 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02002126 rw.flush()
2127 self.assertEqual(b"asdf123fl", raw.getvalue())
2128
Christian Heimes8e42a0a2007-11-08 18:04:45 +00002129 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002130
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002131 def check_flush_and_read(self, read_func):
2132 raw = self.BytesIO(b"abcdefghi")
2133 bufio = self.tp(raw)
2134
Ezio Melottib3aedd42010-11-20 19:04:17 +00002135 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002136 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002137 self.assertEqual(b"ef", read_func(bufio, 2))
2138 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002139 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002140 self.assertEqual(6, bufio.tell())
2141 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002142 raw.seek(0, 0)
2143 raw.write(b"XYZ")
2144 # flush() resets the read buffer
2145 bufio.flush()
2146 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002147 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002148
2149 def test_flush_and_read(self):
2150 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2151
2152 def test_flush_and_readinto(self):
2153 def _readinto(bufio, n=-1):
2154 b = bytearray(n if n >= 0 else 9999)
2155 n = bufio.readinto(b)
2156 return bytes(b[:n])
2157 self.check_flush_and_read(_readinto)
2158
2159 def test_flush_and_peek(self):
2160 def _peek(bufio, n=-1):
2161 # This relies on the fact that the buffer can contain the whole
2162 # raw stream, otherwise peek() can return less.
2163 b = bufio.peek(n)
2164 if n != -1:
2165 b = b[:n]
2166 bufio.seek(len(b), 1)
2167 return b
2168 self.check_flush_and_read(_peek)
2169
2170 def test_flush_and_write(self):
2171 raw = self.BytesIO(b"abcdefghi")
2172 bufio = self.tp(raw)
2173
2174 bufio.write(b"123")
2175 bufio.flush()
2176 bufio.write(b"45")
2177 bufio.flush()
2178 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002179 self.assertEqual(b"12345fghi", raw.getvalue())
2180 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002181
2182 def test_threads(self):
2183 BufferedReaderTest.test_threads(self)
2184 BufferedWriterTest.test_threads(self)
2185
2186 def test_writes_and_peek(self):
2187 def _peek(bufio):
2188 bufio.peek(1)
2189 self.check_writes(_peek)
2190 def _peek(bufio):
2191 pos = bufio.tell()
2192 bufio.seek(-1, 1)
2193 bufio.peek(1)
2194 bufio.seek(pos, 0)
2195 self.check_writes(_peek)
2196
2197 def test_writes_and_reads(self):
2198 def _read(bufio):
2199 bufio.seek(-1, 1)
2200 bufio.read(1)
2201 self.check_writes(_read)
2202
2203 def test_writes_and_read1s(self):
2204 def _read1(bufio):
2205 bufio.seek(-1, 1)
2206 bufio.read1(1)
2207 self.check_writes(_read1)
2208
2209 def test_writes_and_readintos(self):
2210 def _read(bufio):
2211 bufio.seek(-1, 1)
2212 bufio.readinto(bytearray(1))
2213 self.check_writes(_read)
2214
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002215 def test_write_after_readahead(self):
2216 # Issue #6629: writing after the buffer was filled by readahead should
2217 # first rewind the raw stream.
2218 for overwrite_size in [1, 5]:
2219 raw = self.BytesIO(b"A" * 10)
2220 bufio = self.tp(raw, 4)
2221 # Trigger readahead
2222 self.assertEqual(bufio.read(1), b"A")
2223 self.assertEqual(bufio.tell(), 1)
2224 # Overwriting should rewind the raw stream if it needs so
2225 bufio.write(b"B" * overwrite_size)
2226 self.assertEqual(bufio.tell(), overwrite_size + 1)
2227 # If the write size was smaller than the buffer size, flush() and
2228 # check that rewind happens.
2229 bufio.flush()
2230 self.assertEqual(bufio.tell(), overwrite_size + 1)
2231 s = raw.getvalue()
2232 self.assertEqual(s,
2233 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2234
Antoine Pitrou7c404892011-05-13 00:13:33 +02002235 def test_write_rewind_write(self):
2236 # Various combinations of reading / writing / seeking backwards / writing again
2237 def mutate(bufio, pos1, pos2):
2238 assert pos2 >= pos1
2239 # Fill the buffer
2240 bufio.seek(pos1)
2241 bufio.read(pos2 - pos1)
2242 bufio.write(b'\x02')
2243 # This writes earlier than the previous write, but still inside
2244 # the buffer.
2245 bufio.seek(pos1)
2246 bufio.write(b'\x01')
2247
2248 b = b"\x80\x81\x82\x83\x84"
2249 for i in range(0, len(b)):
2250 for j in range(i, len(b)):
2251 raw = self.BytesIO(b)
2252 bufio = self.tp(raw, 100)
2253 mutate(bufio, i, j)
2254 bufio.flush()
2255 expected = bytearray(b)
2256 expected[j] = 2
2257 expected[i] = 1
2258 self.assertEqual(raw.getvalue(), expected,
2259 "failed result for i=%d, j=%d" % (i, j))
2260
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002261 def test_truncate_after_read_or_write(self):
2262 raw = self.BytesIO(b"A" * 10)
2263 bufio = self.tp(raw, 100)
2264 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2265 self.assertEqual(bufio.truncate(), 2)
2266 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2267 self.assertEqual(bufio.truncate(), 4)
2268
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002269 def test_misbehaved_io(self):
2270 BufferedReaderTest.test_misbehaved_io(self)
2271 BufferedWriterTest.test_misbehaved_io(self)
2272
Antoine Pitroue05565e2011-08-20 14:39:23 +02002273 def test_interleaved_read_write(self):
2274 # Test for issue #12213
2275 with self.BytesIO(b'abcdefgh') as raw:
2276 with self.tp(raw, 100) as f:
2277 f.write(b"1")
2278 self.assertEqual(f.read(1), b'b')
2279 f.write(b'2')
2280 self.assertEqual(f.read1(1), b'd')
2281 f.write(b'3')
2282 buf = bytearray(1)
2283 f.readinto(buf)
2284 self.assertEqual(buf, b'f')
2285 f.write(b'4')
2286 self.assertEqual(f.peek(1), b'h')
2287 f.flush()
2288 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2289
2290 with self.BytesIO(b'abc') as raw:
2291 with self.tp(raw, 100) as f:
2292 self.assertEqual(f.read(1), b'a')
2293 f.write(b"2")
2294 self.assertEqual(f.read(1), b'c')
2295 f.flush()
2296 self.assertEqual(raw.getvalue(), b'a2c')
2297
2298 def test_interleaved_readline_write(self):
2299 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2300 with self.tp(raw) as f:
2301 f.write(b'1')
2302 self.assertEqual(f.readline(), b'b\n')
2303 f.write(b'2')
2304 self.assertEqual(f.readline(), b'def\n')
2305 f.write(b'3')
2306 self.assertEqual(f.readline(), b'\n')
2307 f.flush()
2308 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2309
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002310 # You can't construct a BufferedRandom over a non-seekable stream.
2311 test_unseekable = None
2312
R David Murray67bfe802013-02-23 21:51:05 -05002313
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002314class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002315 tp = io.BufferedRandom
2316
2317 def test_constructor(self):
2318 BufferedRandomTest.test_constructor(self)
2319 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08002320 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002321 if sys.maxsize > 0x7FFFFFFF:
2322 rawio = self.MockRawIO()
2323 bufio = self.tp(rawio)
2324 self.assertRaises((OverflowError, MemoryError, ValueError),
2325 bufio.__init__, rawio, sys.maxsize)
2326
2327 def test_garbage_collection(self):
2328 CBufferedReaderTest.test_garbage_collection(self)
2329 CBufferedWriterTest.test_garbage_collection(self)
2330
R David Murray67bfe802013-02-23 21:51:05 -05002331 def test_args_error(self):
2332 # Issue #17275
2333 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2334 self.tp(io.BytesIO(), 1024, 1024, 1024)
2335
2336
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002337class PyBufferedRandomTest(BufferedRandomTest):
2338 tp = pyio.BufferedRandom
2339
2340
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002341# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2342# properties:
2343# - A single output character can correspond to many bytes of input.
2344# - The number of input bytes to complete the character can be
2345# undetermined until the last input byte is received.
2346# - The number of input bytes can vary depending on previous input.
2347# - A single input byte can correspond to many characters of output.
2348# - The number of output characters can be undetermined until the
2349# last input byte is received.
2350# - The number of output characters can vary depending on previous input.
2351
2352class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2353 """
2354 For testing seek/tell behavior with a stateful, buffering decoder.
2355
2356 Input is a sequence of words. Words may be fixed-length (length set
2357 by input) or variable-length (period-terminated). In variable-length
2358 mode, extra periods are ignored. Possible words are:
2359 - 'i' followed by a number sets the input length, I (maximum 99).
2360 When I is set to 0, words are space-terminated.
2361 - 'o' followed by a number sets the output length, O (maximum 99).
2362 - Any other word is converted into a word followed by a period on
2363 the output. The output word consists of the input word truncated
2364 or padded out with hyphens to make its length equal to O. If O
2365 is 0, the word is output verbatim without truncating or padding.
2366 I and O are initially set to 1. When I changes, any buffered input is
2367 re-scanned according to the new I. EOF also terminates the last word.
2368 """
2369
2370 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002371 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002372 self.reset()
2373
2374 def __repr__(self):
2375 return '<SID %x>' % id(self)
2376
2377 def reset(self):
2378 self.i = 1
2379 self.o = 1
2380 self.buffer = bytearray()
2381
2382 def getstate(self):
2383 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2384 return bytes(self.buffer), i*100 + o
2385
2386 def setstate(self, state):
2387 buffer, io = state
2388 self.buffer = bytearray(buffer)
2389 i, o = divmod(io, 100)
2390 self.i, self.o = i ^ 1, o ^ 1
2391
2392 def decode(self, input, final=False):
2393 output = ''
2394 for b in input:
2395 if self.i == 0: # variable-length, terminated with period
2396 if b == ord('.'):
2397 if self.buffer:
2398 output += self.process_word()
2399 else:
2400 self.buffer.append(b)
2401 else: # fixed-length, terminate after self.i bytes
2402 self.buffer.append(b)
2403 if len(self.buffer) == self.i:
2404 output += self.process_word()
2405 if final and self.buffer: # EOF terminates the last word
2406 output += self.process_word()
2407 return output
2408
2409 def process_word(self):
2410 output = ''
2411 if self.buffer[0] == ord('i'):
2412 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2413 elif self.buffer[0] == ord('o'):
2414 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2415 else:
2416 output = self.buffer.decode('ascii')
2417 if len(output) < self.o:
2418 output += '-'*self.o # pad out with hyphens
2419 if self.o:
2420 output = output[:self.o] # truncate to output length
2421 output += '.'
2422 self.buffer = bytearray()
2423 return output
2424
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002425 codecEnabled = False
2426
2427 @classmethod
2428 def lookupTestDecoder(cls, name):
2429 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002430 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002431 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002432 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002433 incrementalencoder=None,
2434 streamreader=None, streamwriter=None,
2435 incrementaldecoder=cls)
2436
2437# Register the previous decoder for testing.
2438# Disabled by default, tests will enable it.
2439codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2440
2441
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002442class StatefulIncrementalDecoderTest(unittest.TestCase):
2443 """
2444 Make sure the StatefulIncrementalDecoder actually works.
2445 """
2446
2447 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002448 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002449 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002450 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002451 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002452 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002453 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002454 # I=0, O=6 (variable-length input, fixed-length output)
2455 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2456 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002457 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002458 # I=6, O=3 (fixed-length input > fixed-length output)
2459 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2460 # I=0, then 3; O=29, then 15 (with longer output)
2461 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2462 'a----------------------------.' +
2463 'b----------------------------.' +
2464 'cde--------------------------.' +
2465 'abcdefghijabcde.' +
2466 'a.b------------.' +
2467 '.c.------------.' +
2468 'd.e------------.' +
2469 'k--------------.' +
2470 'l--------------.' +
2471 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002472 ]
2473
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002474 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002475 # Try a few one-shot test cases.
2476 for input, eof, output in self.test_cases:
2477 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002478 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002479
2480 # Also test an unfinished decode, followed by forcing EOF.
2481 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002482 self.assertEqual(d.decode(b'oiabcd'), '')
2483 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002484
2485class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002486
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002487 def setUp(self):
2488 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2489 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002490 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002491
Guido van Rossumd0712812007-04-11 16:32:43 +00002492 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002493 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002494
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002495 def test_constructor(self):
2496 r = self.BytesIO(b"\xc3\xa9\n\n")
2497 b = self.BufferedReader(r, 1000)
2498 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002499 t.__init__(b, encoding="latin-1", newline="\r\n")
2500 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002501 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002502 t.__init__(b, encoding="utf-8", line_buffering=True)
2503 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002504 self.assertEqual(t.line_buffering, True)
2505 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002506 self.assertRaises(TypeError, t.__init__, b, newline=42)
2507 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2508
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002509 def test_uninitialized(self):
2510 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2511 del t
2512 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2513 self.assertRaises(Exception, repr, t)
2514 self.assertRaisesRegex((ValueError, AttributeError),
2515 'uninitialized|has no attribute',
2516 t.read, 0)
2517 t.__init__(self.MockRawIO())
2518 self.assertEqual(t.read(0), '')
2519
Nick Coghlana9b15242014-02-04 22:11:18 +10002520 def test_non_text_encoding_codecs_are_rejected(self):
2521 # Ensure the constructor complains if passed a codec that isn't
2522 # marked as a text encoding
2523 # http://bugs.python.org/issue20404
2524 r = self.BytesIO()
2525 b = self.BufferedWriter(r)
2526 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2527 self.TextIOWrapper(b, encoding="hex")
2528
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002529 def test_detach(self):
2530 r = self.BytesIO()
2531 b = self.BufferedWriter(r)
2532 t = self.TextIOWrapper(b)
2533 self.assertIs(t.detach(), b)
2534
2535 t = self.TextIOWrapper(b, encoding="ascii")
2536 t.write("howdy")
2537 self.assertFalse(r.getvalue())
2538 t.detach()
2539 self.assertEqual(r.getvalue(), b"howdy")
2540 self.assertRaises(ValueError, t.detach)
2541
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002542 # Operations independent of the detached stream should still work
2543 repr(t)
2544 self.assertEqual(t.encoding, "ascii")
2545 self.assertEqual(t.errors, "strict")
2546 self.assertFalse(t.line_buffering)
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002547 self.assertFalse(t.write_through)
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002548
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002549 def test_repr(self):
2550 raw = self.BytesIO("hello".encode("utf-8"))
2551 b = self.BufferedReader(raw)
2552 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002553 modname = self.TextIOWrapper.__module__
2554 self.assertEqual(repr(t),
2555 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2556 raw.name = "dummy"
2557 self.assertEqual(repr(t),
2558 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002559 t.mode = "r"
2560 self.assertEqual(repr(t),
2561 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002562 raw.name = b"dummy"
2563 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002564 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002565
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002566 t.buffer.detach()
2567 repr(t) # Should not raise an exception
2568
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002569 def test_recursive_repr(self):
2570 # Issue #25455
2571 raw = self.BytesIO()
2572 t = self.TextIOWrapper(raw)
2573 with support.swap_attr(raw, 'name', t):
2574 try:
2575 repr(t) # Should not crash
2576 except RuntimeError:
2577 pass
2578
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002579 def test_line_buffering(self):
2580 r = self.BytesIO()
2581 b = self.BufferedWriter(r, 1000)
2582 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002583 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002584 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002585 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002586 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002587 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002588 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002589
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002590 def test_reconfigure_line_buffering(self):
2591 r = self.BytesIO()
2592 b = self.BufferedWriter(r, 1000)
2593 t = self.TextIOWrapper(b, newline="\n", line_buffering=False)
2594 t.write("AB\nC")
2595 self.assertEqual(r.getvalue(), b"")
2596
2597 t.reconfigure(line_buffering=True) # implicit flush
2598 self.assertEqual(r.getvalue(), b"AB\nC")
2599 t.write("DEF\nG")
2600 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2601 t.write("H")
2602 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2603 t.reconfigure(line_buffering=False) # implicit flush
2604 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2605 t.write("IJ")
2606 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2607
2608 # Keeping default value
2609 t.reconfigure()
2610 t.reconfigure(line_buffering=None)
2611 self.assertEqual(t.line_buffering, False)
2612 t.reconfigure(line_buffering=True)
2613 t.reconfigure()
2614 t.reconfigure(line_buffering=None)
2615 self.assertEqual(t.line_buffering, True)
2616
Victor Stinner91106cd2017-12-13 12:29:09 +01002617 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002618 def test_default_encoding(self):
2619 old_environ = dict(os.environ)
2620 try:
2621 # try to get a user preferred encoding different than the current
2622 # locale encoding to check that TextIOWrapper() uses the current
2623 # locale encoding and not the user preferred encoding
2624 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2625 if key in os.environ:
2626 del os.environ[key]
2627
2628 current_locale_encoding = locale.getpreferredencoding(False)
2629 b = self.BytesIO()
2630 t = self.TextIOWrapper(b)
2631 self.assertEqual(t.encoding, current_locale_encoding)
2632 finally:
2633 os.environ.clear()
2634 os.environ.update(old_environ)
2635
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002636 @support.cpython_only
Victor Stinner91106cd2017-12-13 12:29:09 +01002637 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Serhiy Storchaka78980432013-01-15 01:12:17 +02002638 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002639 # Issue 15989
2640 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002641 b = self.BytesIO()
2642 b.fileno = lambda: _testcapi.INT_MAX + 1
2643 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2644 b.fileno = lambda: _testcapi.UINT_MAX + 1
2645 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2646
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002647 def test_encoding(self):
2648 # Check the encoding attribute is always set, and valid
2649 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002650 t = self.TextIOWrapper(b, encoding="utf-8")
2651 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002652 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002653 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002654 codecs.lookup(t.encoding)
2655
2656 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002657 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002658 b = self.BytesIO(b"abc\n\xff\n")
2659 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002660 self.assertRaises(UnicodeError, t.read)
2661 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002662 b = self.BytesIO(b"abc\n\xff\n")
2663 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002664 self.assertRaises(UnicodeError, t.read)
2665 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002666 b = self.BytesIO(b"abc\n\xff\n")
2667 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002668 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002669 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002670 b = self.BytesIO(b"abc\n\xff\n")
2671 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002672 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002673
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002674 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002675 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002676 b = self.BytesIO()
2677 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002678 self.assertRaises(UnicodeError, t.write, "\xff")
2679 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002680 b = self.BytesIO()
2681 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002682 self.assertRaises(UnicodeError, t.write, "\xff")
2683 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002684 b = self.BytesIO()
2685 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002686 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002687 t.write("abc\xffdef\n")
2688 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002689 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002690 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002691 b = self.BytesIO()
2692 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002693 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002694 t.write("abc\xffdef\n")
2695 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002696 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002697
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002698 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002699 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2700
2701 tests = [
2702 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002703 [ '', input_lines ],
2704 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2705 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2706 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002707 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002708 encodings = (
2709 'utf-8', 'latin-1',
2710 'utf-16', 'utf-16-le', 'utf-16-be',
2711 'utf-32', 'utf-32-le', 'utf-32-be',
2712 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002713
Guido van Rossum8358db22007-08-18 21:39:55 +00002714 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002715 # character in TextIOWrapper._pending_line.
2716 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002717 # XXX: str.encode() should return bytes
2718 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002719 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002720 for bufsize in range(1, 10):
2721 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002722 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2723 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002724 encoding=encoding)
2725 if do_reads:
2726 got_lines = []
2727 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002728 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002729 if c2 == '':
2730 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002731 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002732 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002733 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002734 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002735
2736 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002737 self.assertEqual(got_line, exp_line)
2738 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002739
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002740 def test_newlines_input(self):
2741 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002742 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2743 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002744 (None, normalized.decode("ascii").splitlines(keepends=True)),
2745 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002746 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2747 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2748 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002749 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002750 buf = self.BytesIO(testdata)
2751 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002752 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002753 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002754 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002755
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002756 def test_newlines_output(self):
2757 testdict = {
2758 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2759 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2760 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2761 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2762 }
2763 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2764 for newline, expected in tests:
2765 buf = self.BytesIO()
2766 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2767 txt.write("AAA\nB")
2768 txt.write("BB\nCCC\n")
2769 txt.write("X\rY\r\nZ")
2770 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002771 self.assertEqual(buf.closed, False)
2772 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002773
2774 def test_destructor(self):
2775 l = []
2776 base = self.BytesIO
2777 class MyBytesIO(base):
2778 def close(self):
2779 l.append(self.getvalue())
2780 base.close(self)
2781 b = MyBytesIO()
2782 t = self.TextIOWrapper(b, encoding="ascii")
2783 t.write("abc")
2784 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002785 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002786 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002787
2788 def test_override_destructor(self):
2789 record = []
2790 class MyTextIO(self.TextIOWrapper):
2791 def __del__(self):
2792 record.append(1)
2793 try:
2794 f = super().__del__
2795 except AttributeError:
2796 pass
2797 else:
2798 f()
2799 def close(self):
2800 record.append(2)
2801 super().close()
2802 def flush(self):
2803 record.append(3)
2804 super().flush()
2805 b = self.BytesIO()
2806 t = MyTextIO(b, encoding="ascii")
2807 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002808 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002809 self.assertEqual(record, [1, 2, 3])
2810
2811 def test_error_through_destructor(self):
2812 # Test that the exception state is not modified by a destructor,
2813 # even if close() fails.
2814 rawio = self.CloseFailureIO()
2815 def f():
2816 self.TextIOWrapper(rawio).xyzzy
2817 with support.captured_output("stderr") as s:
2818 self.assertRaises(AttributeError, f)
2819 s = s.getvalue().strip()
2820 if s:
2821 # The destructor *may* have printed an unraisable error, check it
2822 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002823 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002824 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002825
Guido van Rossum9b76da62007-04-11 01:09:03 +00002826 # Systematic tests of the text I/O API
2827
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002828 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002829 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002830 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002831 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002832 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002833 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002834 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002835 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002836 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002837 self.assertEqual(f.tell(), 0)
2838 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002839 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002840 self.assertEqual(f.seek(0), 0)
2841 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002842 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002843 self.assertEqual(f.read(2), "ab")
2844 self.assertEqual(f.read(1), "c")
2845 self.assertEqual(f.read(1), "")
2846 self.assertEqual(f.read(), "")
2847 self.assertEqual(f.tell(), cookie)
2848 self.assertEqual(f.seek(0), 0)
2849 self.assertEqual(f.seek(0, 2), cookie)
2850 self.assertEqual(f.write("def"), 3)
2851 self.assertEqual(f.seek(cookie), cookie)
2852 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002853 if enc.startswith("utf"):
2854 self.multi_line_test(f, enc)
2855 f.close()
2856
2857 def multi_line_test(self, f, enc):
2858 f.seek(0)
2859 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002860 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002861 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002862 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002863 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002864 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002865 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002866 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002867 wlines.append((f.tell(), line))
2868 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002869 f.seek(0)
2870 rlines = []
2871 while True:
2872 pos = f.tell()
2873 line = f.readline()
2874 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002875 break
2876 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002877 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002878
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002879 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002880 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002881 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002882 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002883 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002884 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002885 p2 = f.tell()
2886 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002887 self.assertEqual(f.tell(), p0)
2888 self.assertEqual(f.readline(), "\xff\n")
2889 self.assertEqual(f.tell(), p1)
2890 self.assertEqual(f.readline(), "\xff\n")
2891 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002892 f.seek(0)
2893 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002894 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002895 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002896 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002897 f.close()
2898
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002899 def test_seeking(self):
2900 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002901 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002902 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002903 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002904 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002905 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002906 suffix = bytes(u_suffix.encode("utf-8"))
2907 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002908 with self.open(support.TESTFN, "wb") as f:
2909 f.write(line*2)
2910 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2911 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002912 self.assertEqual(s, str(prefix, "ascii"))
2913 self.assertEqual(f.tell(), prefix_size)
2914 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002915
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002916 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002917 # Regression test for a specific bug
2918 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002919 with self.open(support.TESTFN, "wb") as f:
2920 f.write(data)
2921 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2922 f._CHUNK_SIZE # Just test that it exists
2923 f._CHUNK_SIZE = 2
2924 f.readline()
2925 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002926
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002927 def test_seek_and_tell(self):
2928 #Test seek/tell using the StatefulIncrementalDecoder.
2929 # Make test faster by doing smaller seeks
2930 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002931
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002932 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002933 """Tell/seek to various points within a data stream and ensure
2934 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002935 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002936 f.write(data)
2937 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002938 f = self.open(support.TESTFN, encoding='test_decoder')
2939 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002940 decoded = f.read()
2941 f.close()
2942
Neal Norwitze2b07052008-03-18 19:52:05 +00002943 for i in range(min_pos, len(decoded) + 1): # seek positions
2944 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002945 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002946 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002947 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002948 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002949 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002950 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002951 f.close()
2952
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002953 # Enable the test decoder.
2954 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002955
2956 # Run the tests.
2957 try:
2958 # Try each test case.
2959 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002960 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002961
2962 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002963 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2964 offset = CHUNK_SIZE - len(input)//2
2965 prefix = b'.'*offset
2966 # Don't bother seeking into the prefix (takes too long).
2967 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002968 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002969
2970 # Ensure our test decoder won't interfere with subsequent tests.
2971 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002972 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002973
Christopher Thorneac22f6a2018-11-01 10:48:49 +00002974 def test_multibyte_seek_and_tell(self):
2975 f = self.open(support.TESTFN, "w", encoding="euc_jp")
2976 f.write("AB\n\u3046\u3048\n")
2977 f.close()
2978
2979 f = self.open(support.TESTFN, "r", encoding="euc_jp")
2980 self.assertEqual(f.readline(), "AB\n")
2981 p0 = f.tell()
2982 self.assertEqual(f.readline(), "\u3046\u3048\n")
2983 p1 = f.tell()
2984 f.seek(p0)
2985 self.assertEqual(f.readline(), "\u3046\u3048\n")
2986 self.assertEqual(f.tell(), p1)
2987 f.close()
2988
2989 def test_seek_with_encoder_state(self):
2990 f = self.open(support.TESTFN, "w", encoding="euc_jis_2004")
2991 f.write("\u00e6\u0300")
2992 p0 = f.tell()
2993 f.write("\u00e6")
2994 f.seek(p0)
2995 f.write("\u0300")
2996 f.close()
2997
2998 f = self.open(support.TESTFN, "r", encoding="euc_jis_2004")
2999 self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
3000 f.close()
3001
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003002 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003003 data = "1234567890"
3004 tests = ("utf-16",
3005 "utf-16-le",
3006 "utf-16-be",
3007 "utf-32",
3008 "utf-32-le",
3009 "utf-32-be")
3010 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003011 buf = self.BytesIO()
3012 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003013 # Check if the BOM is written only once (see issue1753).
3014 f.write(data)
3015 f.write(data)
3016 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003017 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00003018 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003019 self.assertEqual(f.read(), data * 2)
3020 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003021
Benjamin Petersona1b49012009-03-31 23:11:32 +00003022 def test_unreadable(self):
3023 class UnReadable(self.BytesIO):
3024 def readable(self):
3025 return False
3026 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003027 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00003028
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003029 def test_read_one_by_one(self):
3030 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003031 reads = ""
3032 while True:
3033 c = txt.read(1)
3034 if not c:
3035 break
3036 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003037 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003038
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00003039 def test_readlines(self):
3040 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
3041 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
3042 txt.seek(0)
3043 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
3044 txt.seek(0)
3045 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
3046
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003047 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003048 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003049 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003050 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003051 reads = ""
3052 while True:
3053 c = txt.read(128)
3054 if not c:
3055 break
3056 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003057 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003058
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02003059 def test_writelines(self):
3060 l = ['ab', 'cd', 'ef']
3061 buf = self.BytesIO()
3062 txt = self.TextIOWrapper(buf)
3063 txt.writelines(l)
3064 txt.flush()
3065 self.assertEqual(buf.getvalue(), b'abcdef')
3066
3067 def test_writelines_userlist(self):
3068 l = UserList(['ab', 'cd', 'ef'])
3069 buf = self.BytesIO()
3070 txt = self.TextIOWrapper(buf)
3071 txt.writelines(l)
3072 txt.flush()
3073 self.assertEqual(buf.getvalue(), b'abcdef')
3074
3075 def test_writelines_error(self):
3076 txt = self.TextIOWrapper(self.BytesIO())
3077 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3078 self.assertRaises(TypeError, txt.writelines, None)
3079 self.assertRaises(TypeError, txt.writelines, b'abc')
3080
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003081 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003082 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003083
3084 # read one char at a time
3085 reads = ""
3086 while True:
3087 c = txt.read(1)
3088 if not c:
3089 break
3090 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003091 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003092
3093 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003094 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003095 txt._CHUNK_SIZE = 4
3096
3097 reads = ""
3098 while True:
3099 c = txt.read(4)
3100 if not c:
3101 break
3102 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003103 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003104
3105 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003106 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003107 txt._CHUNK_SIZE = 4
3108
3109 reads = txt.read(4)
3110 reads += txt.read(4)
3111 reads += txt.readline()
3112 reads += txt.readline()
3113 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003114 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003115
3116 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003117 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003118 txt._CHUNK_SIZE = 4
3119
3120 reads = txt.read(4)
3121 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003122 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003123
3124 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003125 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003126 txt._CHUNK_SIZE = 4
3127
3128 reads = txt.read(4)
3129 pos = txt.tell()
3130 txt.seek(0)
3131 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003132 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003133
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003134 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003135 buffer = self.BytesIO(self.testdata)
3136 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003137
3138 self.assertEqual(buffer.seekable(), txt.seekable())
3139
Antoine Pitroue4501852009-05-14 18:55:55 +00003140 def test_append_bom(self):
3141 # The BOM is not written again when appending to a non-empty file
3142 filename = support.TESTFN
3143 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3144 with self.open(filename, 'w', encoding=charset) as f:
3145 f.write('aaa')
3146 pos = f.tell()
3147 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003148 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003149
3150 with self.open(filename, 'a', encoding=charset) as f:
3151 f.write('xxx')
3152 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003153 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003154
3155 def test_seek_bom(self):
3156 # Same test, but when seeking manually
3157 filename = support.TESTFN
3158 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3159 with self.open(filename, 'w', encoding=charset) as f:
3160 f.write('aaa')
3161 pos = f.tell()
3162 with self.open(filename, 'r+', encoding=charset) as f:
3163 f.seek(pos)
3164 f.write('zzz')
3165 f.seek(0)
3166 f.write('bbb')
3167 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003168 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003169
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003170 def test_seek_append_bom(self):
3171 # Same test, but first seek to the start and then to the end
3172 filename = support.TESTFN
3173 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3174 with self.open(filename, 'w', encoding=charset) as f:
3175 f.write('aaa')
3176 with self.open(filename, 'a', encoding=charset) as f:
3177 f.seek(0)
3178 f.seek(0, self.SEEK_END)
3179 f.write('xxx')
3180 with self.open(filename, 'rb') as f:
3181 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3182
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003183 def test_errors_property(self):
3184 with self.open(support.TESTFN, "w") as f:
3185 self.assertEqual(f.errors, "strict")
3186 with self.open(support.TESTFN, "w", errors="replace") as f:
3187 self.assertEqual(f.errors, "replace")
3188
Brett Cannon31f59292011-02-21 19:29:56 +00003189 @support.no_tracing
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003190 def test_threads_write(self):
3191 # Issue6750: concurrent writes could duplicate data
3192 event = threading.Event()
3193 with self.open(support.TESTFN, "w", buffering=1) as f:
3194 def run(n):
3195 text = "Thread%03d\n" % n
3196 event.wait()
3197 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003198 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003199 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003200 with support.start_threads(threads, event.set):
3201 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003202 with self.open(support.TESTFN) as f:
3203 content = f.read()
3204 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003205 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003206
Antoine Pitrou6be88762010-05-03 16:48:20 +00003207 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003208 # Test that text file is closed despite failed flush
3209 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003210 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003211 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003212 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003213 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003214 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003215 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003216 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003217 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003218 self.assertTrue(txt.buffer.closed)
3219 self.assertTrue(closed) # flush() called
3220 self.assertFalse(closed[0]) # flush() called before file closed
3221 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003222 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003223
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003224 def test_close_error_on_close(self):
3225 buffer = self.BytesIO(self.testdata)
3226 def bad_flush():
3227 raise OSError('flush')
3228 def bad_close():
3229 raise OSError('close')
3230 buffer.close = bad_close
3231 txt = self.TextIOWrapper(buffer, encoding="ascii")
3232 txt.flush = bad_flush
3233 with self.assertRaises(OSError) as err: # exception not swallowed
3234 txt.close()
3235 self.assertEqual(err.exception.args, ('close',))
3236 self.assertIsInstance(err.exception.__context__, OSError)
3237 self.assertEqual(err.exception.__context__.args, ('flush',))
3238 self.assertFalse(txt.closed)
3239
3240 def test_nonnormalized_close_error_on_close(self):
3241 # Issue #21677
3242 buffer = self.BytesIO(self.testdata)
3243 def bad_flush():
3244 raise non_existing_flush
3245 def bad_close():
3246 raise non_existing_close
3247 buffer.close = bad_close
3248 txt = self.TextIOWrapper(buffer, encoding="ascii")
3249 txt.flush = bad_flush
3250 with self.assertRaises(NameError) as err: # exception not swallowed
3251 txt.close()
3252 self.assertIn('non_existing_close', str(err.exception))
3253 self.assertIsInstance(err.exception.__context__, NameError)
3254 self.assertIn('non_existing_flush', str(err.exception.__context__))
3255 self.assertFalse(txt.closed)
3256
Antoine Pitrou6be88762010-05-03 16:48:20 +00003257 def test_multi_close(self):
3258 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3259 txt.close()
3260 txt.close()
3261 txt.close()
3262 self.assertRaises(ValueError, txt.flush)
3263
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003264 def test_unseekable(self):
3265 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3266 self.assertRaises(self.UnsupportedOperation, txt.tell)
3267 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3268
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003269 def test_readonly_attributes(self):
3270 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3271 buf = self.BytesIO(self.testdata)
3272 with self.assertRaises(AttributeError):
3273 txt.buffer = buf
3274
Antoine Pitroue96ec682011-07-23 21:46:35 +02003275 def test_rawio(self):
3276 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3277 # that subprocess.Popen() can have the required unbuffered
3278 # semantics with universal_newlines=True.
3279 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3280 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3281 # Reads
3282 self.assertEqual(txt.read(4), 'abcd')
3283 self.assertEqual(txt.readline(), 'efghi\n')
3284 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3285
3286 def test_rawio_write_through(self):
3287 # Issue #12591: with write_through=True, writes don't need a flush
3288 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3289 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3290 write_through=True)
3291 txt.write('1')
3292 txt.write('23\n4')
3293 txt.write('5')
3294 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3295
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003296 def test_bufio_write_through(self):
3297 # Issue #21396: write_through=True doesn't force a flush()
3298 # on the underlying binary buffered object.
3299 flush_called, write_called = [], []
3300 class BufferedWriter(self.BufferedWriter):
3301 def flush(self, *args, **kwargs):
3302 flush_called.append(True)
3303 return super().flush(*args, **kwargs)
3304 def write(self, *args, **kwargs):
3305 write_called.append(True)
3306 return super().write(*args, **kwargs)
3307
3308 rawio = self.BytesIO()
3309 data = b"a"
3310 bufio = BufferedWriter(rawio, len(data)*2)
3311 textio = self.TextIOWrapper(bufio, encoding='ascii',
3312 write_through=True)
3313 # write to the buffered io but don't overflow the buffer
3314 text = data.decode('ascii')
3315 textio.write(text)
3316
3317 # buffer.flush is not called with write_through=True
3318 self.assertFalse(flush_called)
3319 # buffer.write *is* called with write_through=True
3320 self.assertTrue(write_called)
3321 self.assertEqual(rawio.getvalue(), b"") # no flush
3322
3323 write_called = [] # reset
3324 textio.write(text * 10) # total content is larger than bufio buffer
3325 self.assertTrue(write_called)
3326 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3327
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02003328 def test_reconfigure_write_through(self):
3329 raw = self.MockRawIO([])
3330 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3331 t.write('1')
3332 t.reconfigure(write_through=True) # implied flush
3333 self.assertEqual(t.write_through, True)
3334 self.assertEqual(b''.join(raw._write_stack), b'1')
3335 t.write('23')
3336 self.assertEqual(b''.join(raw._write_stack), b'123')
3337 t.reconfigure(write_through=False)
3338 self.assertEqual(t.write_through, False)
3339 t.write('45')
3340 t.flush()
3341 self.assertEqual(b''.join(raw._write_stack), b'12345')
3342 # Keeping default value
3343 t.reconfigure()
3344 t.reconfigure(write_through=None)
3345 self.assertEqual(t.write_through, False)
3346 t.reconfigure(write_through=True)
3347 t.reconfigure()
3348 t.reconfigure(write_through=None)
3349 self.assertEqual(t.write_through, True)
3350
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003351 def test_read_nonbytes(self):
3352 # Issue #17106
3353 # Crash when underlying read() returns non-bytes
3354 t = self.TextIOWrapper(self.StringIO('a'))
3355 self.assertRaises(TypeError, t.read, 1)
3356 t = self.TextIOWrapper(self.StringIO('a'))
3357 self.assertRaises(TypeError, t.readline)
3358 t = self.TextIOWrapper(self.StringIO('a'))
3359 self.assertRaises(TypeError, t.read)
3360
Oren Milmana5b4ea12017-08-25 21:14:54 +03003361 def test_illegal_encoder(self):
3362 # Issue 31271: Calling write() while the return value of encoder's
3363 # encode() is invalid shouldn't cause an assertion failure.
3364 rot13 = codecs.lookup("rot13")
3365 with support.swap_attr(rot13, '_is_text_encoding', True):
3366 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3367 self.assertRaises(TypeError, t.write, 'bar')
3368
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003369 def test_illegal_decoder(self):
3370 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003371 # Bypass the early encoding check added in issue 20404
3372 def _make_illegal_wrapper():
3373 quopri = codecs.lookup("quopri")
3374 quopri._is_text_encoding = True
3375 try:
3376 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3377 newline='\n', encoding="quopri")
3378 finally:
3379 quopri._is_text_encoding = False
3380 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003381 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003382 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003383 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003384 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003385 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003386 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003387 self.assertRaises(TypeError, t.read)
3388
Oren Milmanba7d7362017-08-29 11:58:27 +03003389 # Issue 31243: calling read() while the return value of decoder's
3390 # getstate() is invalid should neither crash the interpreter nor
3391 # raise a SystemError.
3392 def _make_very_illegal_wrapper(getstate_ret_val):
3393 class BadDecoder:
3394 def getstate(self):
3395 return getstate_ret_val
3396 def _get_bad_decoder(dummy):
3397 return BadDecoder()
3398 quopri = codecs.lookup("quopri")
3399 with support.swap_attr(quopri, 'incrementaldecoder',
3400 _get_bad_decoder):
3401 return _make_illegal_wrapper()
3402 t = _make_very_illegal_wrapper(42)
3403 self.assertRaises(TypeError, t.read, 42)
3404 t = _make_very_illegal_wrapper(())
3405 self.assertRaises(TypeError, t.read, 42)
3406 t = _make_very_illegal_wrapper((1, 2))
3407 self.assertRaises(TypeError, t.read, 42)
3408
Antoine Pitrou712cb732013-12-21 15:51:54 +01003409 def _check_create_at_shutdown(self, **kwargs):
3410 # Issue #20037: creating a TextIOWrapper at shutdown
3411 # shouldn't crash the interpreter.
3412 iomod = self.io.__name__
3413 code = """if 1:
3414 import codecs
3415 import {iomod} as io
3416
3417 # Avoid looking up codecs at shutdown
3418 codecs.lookup('utf-8')
3419
3420 class C:
3421 def __init__(self):
3422 self.buf = io.BytesIO()
3423 def __del__(self):
3424 io.TextIOWrapper(self.buf, **{kwargs})
3425 print("ok")
3426 c = C()
3427 """.format(iomod=iomod, kwargs=kwargs)
3428 return assert_python_ok("-c", code)
3429
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003430 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003431 def test_create_at_shutdown_without_encoding(self):
3432 rc, out, err = self._check_create_at_shutdown()
3433 if err:
3434 # Can error out with a RuntimeError if the module state
3435 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003436 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003437 else:
3438 self.assertEqual("ok", out.decode().strip())
3439
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003440 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003441 def test_create_at_shutdown_with_encoding(self):
3442 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3443 errors='strict')
3444 self.assertFalse(err)
3445 self.assertEqual("ok", out.decode().strip())
3446
Antoine Pitroub8503892014-04-29 10:14:02 +02003447 def test_read_byteslike(self):
3448 r = MemviewBytesIO(b'Just some random string\n')
3449 t = self.TextIOWrapper(r, 'utf-8')
3450
3451 # TextIOwrapper will not read the full string, because
3452 # we truncate it to a multiple of the native int size
3453 # so that we can construct a more complex memoryview.
3454 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3455
3456 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3457
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003458 def test_issue22849(self):
3459 class F(object):
3460 def readable(self): return True
3461 def writable(self): return True
3462 def seekable(self): return True
3463
3464 for i in range(10):
3465 try:
3466 self.TextIOWrapper(F(), encoding='utf-8')
3467 except Exception:
3468 pass
3469
3470 F.tell = lambda x: 0
3471 t = self.TextIOWrapper(F(), encoding='utf-8')
3472
INADA Naoki507434f2017-12-21 09:59:53 +09003473 def test_reconfigure_encoding_read(self):
3474 # latin1 -> utf8
3475 # (latin1 can decode utf-8 encoded string)
3476 data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
3477 raw = self.BytesIO(data)
3478 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3479 self.assertEqual(txt.readline(), 'abc\xe9\n')
3480 with self.assertRaises(self.UnsupportedOperation):
3481 txt.reconfigure(encoding='utf-8')
3482 with self.assertRaises(self.UnsupportedOperation):
3483 txt.reconfigure(newline=None)
3484
3485 def test_reconfigure_write_fromascii(self):
3486 # ascii has a specific encodefunc in the C implementation,
3487 # but utf-8-sig has not. Make sure that we get rid of the
3488 # cached encodefunc when we switch encoders.
3489 raw = self.BytesIO()
3490 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3491 txt.write('foo\n')
3492 txt.reconfigure(encoding='utf-8-sig')
3493 txt.write('\xe9\n')
3494 txt.flush()
3495 self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
3496
3497 def test_reconfigure_write(self):
3498 # latin -> utf8
3499 raw = self.BytesIO()
3500 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3501 txt.write('abc\xe9\n')
3502 txt.reconfigure(encoding='utf-8')
3503 self.assertEqual(raw.getvalue(), b'abc\xe9\n')
3504 txt.write('d\xe9f\n')
3505 txt.flush()
3506 self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
3507
3508 # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
3509 # the file
3510 raw = self.BytesIO()
3511 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3512 txt.write('abc\n')
3513 txt.reconfigure(encoding='utf-8-sig')
3514 txt.write('d\xe9f\n')
3515 txt.flush()
3516 self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
3517
3518 def test_reconfigure_write_non_seekable(self):
3519 raw = self.BytesIO()
3520 raw.seekable = lambda: False
3521 raw.seek = None
3522 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3523 txt.write('abc\n')
3524 txt.reconfigure(encoding='utf-8-sig')
3525 txt.write('d\xe9f\n')
3526 txt.flush()
3527
3528 # If the raw stream is not seekable, there'll be a BOM
3529 self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
3530
3531 def test_reconfigure_defaults(self):
3532 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
3533 txt.reconfigure(encoding=None)
3534 self.assertEqual(txt.encoding, 'ascii')
3535 self.assertEqual(txt.errors, 'replace')
3536 txt.write('LF\n')
3537
3538 txt.reconfigure(newline='\r\n')
3539 self.assertEqual(txt.encoding, 'ascii')
3540 self.assertEqual(txt.errors, 'replace')
3541
3542 txt.reconfigure(errors='ignore')
3543 self.assertEqual(txt.encoding, 'ascii')
3544 self.assertEqual(txt.errors, 'ignore')
3545 txt.write('CRLF\n')
3546
3547 txt.reconfigure(encoding='utf-8', newline=None)
3548 self.assertEqual(txt.errors, 'strict')
3549 txt.seek(0)
3550 self.assertEqual(txt.read(), 'LF\nCRLF\n')
3551
3552 self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
3553
3554 def test_reconfigure_newline(self):
3555 raw = self.BytesIO(b'CR\rEOF')
3556 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3557 txt.reconfigure(newline=None)
3558 self.assertEqual(txt.readline(), 'CR\n')
3559 raw = self.BytesIO(b'CR\rEOF')
3560 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3561 txt.reconfigure(newline='')
3562 self.assertEqual(txt.readline(), 'CR\r')
3563 raw = self.BytesIO(b'CR\rLF\nEOF')
3564 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3565 txt.reconfigure(newline='\n')
3566 self.assertEqual(txt.readline(), 'CR\rLF\n')
3567 raw = self.BytesIO(b'LF\nCR\rEOF')
3568 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3569 txt.reconfigure(newline='\r')
3570 self.assertEqual(txt.readline(), 'LF\nCR\r')
3571 raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
3572 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3573 txt.reconfigure(newline='\r\n')
3574 self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
3575
3576 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
3577 txt.reconfigure(newline=None)
3578 txt.write('linesep\n')
3579 txt.reconfigure(newline='')
3580 txt.write('LF\n')
3581 txt.reconfigure(newline='\n')
3582 txt.write('LF\n')
3583 txt.reconfigure(newline='\r')
3584 txt.write('CR\n')
3585 txt.reconfigure(newline='\r\n')
3586 txt.write('CRLF\n')
3587 expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
3588 self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
3589
Zackery Spytz23db9352018-06-29 04:14:58 -06003590 def test_issue25862(self):
3591 # Assertion failures occurred in tell() after read() and write().
3592 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3593 t.read(1)
3594 t.read()
3595 t.tell()
3596 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3597 t.read(1)
3598 t.write('x')
3599 t.tell()
3600
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003601
Antoine Pitroub8503892014-04-29 10:14:02 +02003602class MemviewBytesIO(io.BytesIO):
3603 '''A BytesIO object whose read method returns memoryviews
3604 rather than bytes'''
3605
3606 def read1(self, len_):
3607 return _to_memoryview(super().read1(len_))
3608
3609 def read(self, len_):
3610 return _to_memoryview(super().read(len_))
3611
3612def _to_memoryview(buf):
3613 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3614
3615 arr = array.array('i')
3616 idx = len(buf) - len(buf) % arr.itemsize
3617 arr.frombytes(buf[:idx])
3618 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003619
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003620
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003621class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003622 io = io
Steve Dowerfe8f4c92016-09-07 09:31:52 -07003623 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003624
3625 def test_initialization(self):
3626 r = self.BytesIO(b"\xc3\xa9\n\n")
3627 b = self.BufferedReader(r, 1000)
3628 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003629 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3630 self.assertRaises(ValueError, t.read)
3631
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003632 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3633 self.assertRaises(Exception, repr, t)
3634
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003635 def test_garbage_collection(self):
3636 # C TextIOWrapper objects are collected, and collecting them flushes
3637 # all data to disk.
3638 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003639 with support.check_warnings(('', ResourceWarning)):
3640 rawio = io.FileIO(support.TESTFN, "wb")
3641 b = self.BufferedWriter(rawio)
3642 t = self.TextIOWrapper(b, encoding="ascii")
3643 t.write("456def")
3644 t.x = t
3645 wr = weakref.ref(t)
3646 del t
3647 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003648 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003649 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003650 self.assertEqual(f.read(), b"456def")
3651
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003652 def test_rwpair_cleared_before_textio(self):
3653 # Issue 13070: TextIOWrapper's finalization would crash when called
3654 # after the reference to the underlying BufferedRWPair's writer got
3655 # cleared by the GC.
3656 for i in range(1000):
3657 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3658 t1 = self.TextIOWrapper(b1, encoding="ascii")
3659 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3660 t2 = self.TextIOWrapper(b2, encoding="ascii")
3661 # circular references
3662 t1.buddy = t2
3663 t2.buddy = t1
3664 support.gc_collect()
3665
3666
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003667class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003668 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003669 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003670
3671
3672class IncrementalNewlineDecoderTest(unittest.TestCase):
3673
3674 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003675 # UTF-8 specific tests for a newline decoder
3676 def _check_decode(b, s, **kwargs):
3677 # We exercise getstate() / setstate() as well as decode()
3678 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003679 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003680 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003681 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003682
Antoine Pitrou180a3362008-12-14 16:36:46 +00003683 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003684
Antoine Pitrou180a3362008-12-14 16:36:46 +00003685 _check_decode(b'\xe8', "")
3686 _check_decode(b'\xa2', "")
3687 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003688
Antoine Pitrou180a3362008-12-14 16:36:46 +00003689 _check_decode(b'\xe8', "")
3690 _check_decode(b'\xa2', "")
3691 _check_decode(b'\x88', "\u8888")
3692
3693 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003694 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3695
Antoine Pitrou180a3362008-12-14 16:36:46 +00003696 decoder.reset()
3697 _check_decode(b'\n', "\n")
3698 _check_decode(b'\r', "")
3699 _check_decode(b'', "\n", final=True)
3700 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003701
Antoine Pitrou180a3362008-12-14 16:36:46 +00003702 _check_decode(b'\r', "")
3703 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003704
Antoine Pitrou180a3362008-12-14 16:36:46 +00003705 _check_decode(b'\r\r\n', "\n\n")
3706 _check_decode(b'\r', "")
3707 _check_decode(b'\r', "\n")
3708 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003709
Antoine Pitrou180a3362008-12-14 16:36:46 +00003710 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3711 _check_decode(b'\xe8\xa2\x88', "\u8888")
3712 _check_decode(b'\n', "\n")
3713 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3714 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003715
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003716 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003717 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003718 if encoding is not None:
3719 encoder = codecs.getincrementalencoder(encoding)()
3720 def _decode_bytewise(s):
3721 # Decode one byte at a time
3722 for b in encoder.encode(s):
3723 result.append(decoder.decode(bytes([b])))
3724 else:
3725 encoder = None
3726 def _decode_bytewise(s):
3727 # Decode one char at a time
3728 for c in s:
3729 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003730 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003731 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003732 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003733 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003734 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003735 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003736 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003737 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003738 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003739 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003740 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003741 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003742 input = "abc"
3743 if encoder is not None:
3744 encoder.reset()
3745 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003746 self.assertEqual(decoder.decode(input), "abc")
3747 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003748
3749 def test_newline_decoder(self):
3750 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003751 # None meaning the IncrementalNewlineDecoder takes unicode input
3752 # rather than bytes input
3753 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003754 'utf-16', 'utf-16-le', 'utf-16-be',
3755 'utf-32', 'utf-32-le', 'utf-32-be',
3756 )
3757 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003758 decoder = enc and codecs.getincrementaldecoder(enc)()
3759 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3760 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003761 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003762 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3763 self.check_newline_decoding_utf8(decoder)
Oren Milman1d1d3e92017-08-20 18:35:36 +03003764 self.assertRaises(TypeError, decoder.setstate, 42)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003765
Antoine Pitrou66913e22009-03-06 23:40:56 +00003766 def test_newline_bytes(self):
3767 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3768 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003769 self.assertEqual(dec.newlines, None)
3770 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3771 self.assertEqual(dec.newlines, None)
3772 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3773 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003774 dec = self.IncrementalNewlineDecoder(None, translate=False)
3775 _check(dec)
3776 dec = self.IncrementalNewlineDecoder(None, translate=True)
3777 _check(dec)
3778
Xiang Zhangb08746b2018-10-31 19:49:16 +08003779 def test_translate(self):
3780 # issue 35062
3781 for translate in (-2, -1, 1, 2):
3782 decoder = codecs.getincrementaldecoder("utf-8")()
3783 decoder = self.IncrementalNewlineDecoder(decoder, translate)
3784 self.check_newline_decoding_utf8(decoder)
3785 decoder = codecs.getincrementaldecoder("utf-8")()
3786 decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
3787 self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
3788
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003789class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3790 pass
3791
3792class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3793 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003794
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003795
Guido van Rossum01a27522007-03-07 01:00:12 +00003796# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003797
Guido van Rossum5abbf752007-08-27 17:39:33 +00003798class MiscIOTest(unittest.TestCase):
3799
Barry Warsaw40e82462008-11-20 20:14:50 +00003800 def tearDown(self):
3801 support.unlink(support.TESTFN)
3802
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003803 def test___all__(self):
3804 for name in self.io.__all__:
3805 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003806 self.assertIsNotNone(obj, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003807 if name == "open":
3808 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003809 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003810 self.assertTrue(issubclass(obj, Exception), name)
3811 elif not name.startswith("SEEK_"):
3812 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003813
Barry Warsaw40e82462008-11-20 20:14:50 +00003814 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003815 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003816 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003817 f.close()
3818
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003819 with support.check_warnings(('', DeprecationWarning)):
3820 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003821 self.assertEqual(f.name, support.TESTFN)
3822 self.assertEqual(f.buffer.name, support.TESTFN)
3823 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3824 self.assertEqual(f.mode, "U")
3825 self.assertEqual(f.buffer.mode, "rb")
3826 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003827 f.close()
3828
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003829 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003830 self.assertEqual(f.mode, "w+")
3831 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3832 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003833
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003834 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003835 self.assertEqual(g.mode, "wb")
3836 self.assertEqual(g.raw.mode, "wb")
3837 self.assertEqual(g.name, f.fileno())
3838 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003839 f.close()
3840 g.close()
3841
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003842 def test_io_after_close(self):
3843 for kwargs in [
3844 {"mode": "w"},
3845 {"mode": "wb"},
3846 {"mode": "w", "buffering": 1},
3847 {"mode": "w", "buffering": 2},
3848 {"mode": "wb", "buffering": 0},
3849 {"mode": "r"},
3850 {"mode": "rb"},
3851 {"mode": "r", "buffering": 1},
3852 {"mode": "r", "buffering": 2},
3853 {"mode": "rb", "buffering": 0},
3854 {"mode": "w+"},
3855 {"mode": "w+b"},
3856 {"mode": "w+", "buffering": 1},
3857 {"mode": "w+", "buffering": 2},
3858 {"mode": "w+b", "buffering": 0},
3859 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003860 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003861 f.close()
3862 self.assertRaises(ValueError, f.flush)
3863 self.assertRaises(ValueError, f.fileno)
3864 self.assertRaises(ValueError, f.isatty)
3865 self.assertRaises(ValueError, f.__iter__)
3866 if hasattr(f, "peek"):
3867 self.assertRaises(ValueError, f.peek, 1)
3868 self.assertRaises(ValueError, f.read)
3869 if hasattr(f, "read1"):
3870 self.assertRaises(ValueError, f.read1, 1024)
Martin Panterccb2c0e2016-10-20 23:48:14 +00003871 self.assertRaises(ValueError, f.read1)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003872 if hasattr(f, "readall"):
3873 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003874 if hasattr(f, "readinto"):
3875 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003876 if hasattr(f, "readinto1"):
3877 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003878 self.assertRaises(ValueError, f.readline)
3879 self.assertRaises(ValueError, f.readlines)
Xiang Zhang026435c2017-04-15 12:47:28 +08003880 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003881 self.assertRaises(ValueError, f.seek, 0)
3882 self.assertRaises(ValueError, f.tell)
3883 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003884 self.assertRaises(ValueError, f.write,
3885 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003886 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003887 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003888
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003889 def test_blockingioerror(self):
3890 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003891 class C(str):
3892 pass
3893 c = C("")
3894 b = self.BlockingIOError(1, c)
3895 c.b = b
3896 b.c = c
3897 wr = weakref.ref(c)
3898 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003899 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003900 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003901
3902 def test_abcs(self):
3903 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003904 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3905 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3906 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3907 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003908
3909 def _check_abc_inheritance(self, abcmodule):
3910 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003911 self.assertIsInstance(f, abcmodule.IOBase)
3912 self.assertIsInstance(f, abcmodule.RawIOBase)
3913 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3914 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003915 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003916 self.assertIsInstance(f, abcmodule.IOBase)
3917 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3918 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3919 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003920 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003921 self.assertIsInstance(f, abcmodule.IOBase)
3922 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3923 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3924 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003925
3926 def test_abc_inheritance(self):
3927 # Test implementations inherit from their respective ABCs
3928 self._check_abc_inheritance(self)
3929
3930 def test_abc_inheritance_official(self):
3931 # Test implementations inherit from the official ABCs of the
3932 # baseline "io" module.
3933 self._check_abc_inheritance(io)
3934
Antoine Pitroue033e062010-10-29 10:38:18 +00003935 def _check_warn_on_dealloc(self, *args, **kwargs):
3936 f = open(*args, **kwargs)
3937 r = repr(f)
3938 with self.assertWarns(ResourceWarning) as cm:
3939 f = None
3940 support.gc_collect()
3941 self.assertIn(r, str(cm.warning.args[0]))
3942
3943 def test_warn_on_dealloc(self):
3944 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3945 self._check_warn_on_dealloc(support.TESTFN, "wb")
3946 self._check_warn_on_dealloc(support.TESTFN, "w")
3947
3948 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3949 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003950 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003951 for fd in fds:
3952 try:
3953 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003954 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003955 if e.errno != errno.EBADF:
3956 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003957 self.addCleanup(cleanup_fds)
3958 r, w = os.pipe()
3959 fds += r, w
3960 self._check_warn_on_dealloc(r, *args, **kwargs)
3961 # When using closefd=False, there's no warning
3962 r, w = os.pipe()
3963 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02003964 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00003965 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00003966
3967 def test_warn_on_dealloc_fd(self):
3968 self._check_warn_on_dealloc_fd("rb", buffering=0)
3969 self._check_warn_on_dealloc_fd("rb")
3970 self._check_warn_on_dealloc_fd("r")
3971
3972
Antoine Pitrou243757e2010-11-05 21:15:39 +00003973 def test_pickling(self):
3974 # Pickling file objects is forbidden
3975 for kwargs in [
3976 {"mode": "w"},
3977 {"mode": "wb"},
3978 {"mode": "wb", "buffering": 0},
3979 {"mode": "r"},
3980 {"mode": "rb"},
3981 {"mode": "rb", "buffering": 0},
3982 {"mode": "w+"},
3983 {"mode": "w+b"},
3984 {"mode": "w+b", "buffering": 0},
3985 ]:
3986 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3987 with self.open(support.TESTFN, **kwargs) as f:
3988 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3989
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003990 def test_nonblock_pipe_write_bigbuf(self):
3991 self._test_nonblock_pipe_write(16*1024)
3992
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003993 def test_nonblock_pipe_write_smallbuf(self):
3994 self._test_nonblock_pipe_write(1024)
3995
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003996 @unittest.skipUnless(hasattr(os, 'set_blocking'),
3997 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003998 def _test_nonblock_pipe_write(self, bufsize):
3999 sent = []
4000 received = []
4001 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004002 os.set_blocking(r, False)
4003 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004004
4005 # To exercise all code paths in the C implementation we need
4006 # to play with buffer sizes. For instance, if we choose a
4007 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
4008 # then we will never get a partial write of the buffer.
4009 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
4010 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
4011
4012 with rf, wf:
4013 for N in 9999, 73, 7574:
4014 try:
4015 i = 0
4016 while True:
4017 msg = bytes([i % 26 + 97]) * N
4018 sent.append(msg)
4019 wf.write(msg)
4020 i += 1
4021
4022 except self.BlockingIOError as e:
4023 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004024 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004025 sent[-1] = sent[-1][:e.characters_written]
4026 received.append(rf.read())
4027 msg = b'BLOCKED'
4028 wf.write(msg)
4029 sent.append(msg)
4030
4031 while True:
4032 try:
4033 wf.flush()
4034 break
4035 except self.BlockingIOError as e:
4036 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004037 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004038 self.assertEqual(e.characters_written, 0)
4039 received.append(rf.read())
4040
4041 received += iter(rf.read, None)
4042
4043 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03004044 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004045 self.assertTrue(wf.closed)
4046 self.assertTrue(rf.closed)
4047
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004048 def test_create_fail(self):
4049 # 'x' mode fails if file is existing
4050 with self.open(support.TESTFN, 'w'):
4051 pass
4052 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
4053
4054 def test_create_writes(self):
4055 # 'x' mode opens for writing
4056 with self.open(support.TESTFN, 'xb') as f:
4057 f.write(b"spam")
4058 with self.open(support.TESTFN, 'rb') as f:
4059 self.assertEqual(b"spam", f.read())
4060
Christian Heimes7b648752012-09-10 14:48:43 +02004061 def test_open_allargs(self):
4062 # there used to be a buffer overflow in the parser for rawmode
4063 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
4064
4065
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004066class CMiscIOTest(MiscIOTest):
4067 io = io
4068
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03004069 def test_readinto_buffer_overflow(self):
4070 # Issue #18025
4071 class BadReader(self.io.BufferedIOBase):
4072 def read(self, n=-1):
4073 return b'x' * 10**6
4074 bufio = BadReader()
4075 b = bytearray(2)
4076 self.assertRaises(ValueError, bufio.readinto, b)
4077
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004078 def check_daemon_threads_shutdown_deadlock(self, stream_name):
4079 # Issue #23309: deadlocks at shutdown should be avoided when a
4080 # daemon thread and the main thread both write to a file.
4081 code = """if 1:
4082 import sys
4083 import time
4084 import threading
Victor Stinner2a1aed02017-04-21 17:59:23 +02004085 from test.support import SuppressCrashReport
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004086
4087 file = sys.{stream_name}
4088
4089 def run():
4090 while True:
4091 file.write('.')
4092 file.flush()
4093
Victor Stinner2a1aed02017-04-21 17:59:23 +02004094 crash = SuppressCrashReport()
4095 crash.__enter__()
4096 # don't call __exit__(): the crash occurs at Python shutdown
4097
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004098 thread = threading.Thread(target=run)
4099 thread.daemon = True
4100 thread.start()
4101
4102 time.sleep(0.5)
4103 file.write('!')
4104 file.flush()
4105 """.format_map(locals())
4106 res, _ = run_python_until_end("-c", code)
4107 err = res.err.decode()
4108 if res.rc != 0:
4109 # Failure: should be a fatal error
4110 self.assertIn("Fatal Python error: could not acquire lock "
4111 "for <_io.BufferedWriter name='<{stream_name}>'> "
4112 "at interpreter shutdown, possibly due to "
4113 "daemon threads".format_map(locals()),
4114 err)
4115 else:
4116 self.assertFalse(err.strip('.!'))
4117
4118 def test_daemon_threads_shutdown_stdout_deadlock(self):
4119 self.check_daemon_threads_shutdown_deadlock('stdout')
4120
4121 def test_daemon_threads_shutdown_stderr_deadlock(self):
4122 self.check_daemon_threads_shutdown_deadlock('stderr')
4123
4124
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004125class PyMiscIOTest(MiscIOTest):
4126 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00004127
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004128
4129@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
4130class SignalsTest(unittest.TestCase):
4131
4132 def setUp(self):
4133 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
4134
4135 def tearDown(self):
4136 signal.signal(signal.SIGALRM, self.oldalrm)
4137
4138 def alarm_interrupt(self, sig, frame):
4139 1/0
4140
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004141 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
4142 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00004143 invokes the signal handler, and bubbles up the exception raised
4144 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004145 read_results = []
4146 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02004147 if hasattr(signal, 'pthread_sigmask'):
4148 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004149 s = os.read(r, 1)
4150 read_results.append(s)
4151 t = threading.Thread(target=_read)
4152 t.daemon = True
4153 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00004154 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01004155 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004156 try:
4157 wio = self.io.open(w, **fdopen_kwargs)
4158 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004159 # Fill the pipe enough that the write will be blocking.
4160 # It will be interrupted by the timer armed above. Since the
4161 # other thread has read one byte, the low-level write will
4162 # return with a successful (partial) result rather than an EINTR.
4163 # The buffered IO layer must check for pending signal
4164 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02004165 signal.alarm(1)
4166 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01004167 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02004168 finally:
4169 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03004170 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004171 # We got one byte, get another one and check that it isn't a
4172 # repeat of the first one.
4173 read_results.append(os.read(r, 1))
4174 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
4175 finally:
4176 os.close(w)
4177 os.close(r)
4178 # This is deliberate. If we didn't close the file descriptor
4179 # before closing wio, wio would try to flush its internal
4180 # buffer, and block again.
4181 try:
4182 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004183 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004184 if e.errno != errno.EBADF:
4185 raise
4186
4187 def test_interrupted_write_unbuffered(self):
4188 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
4189
4190 def test_interrupted_write_buffered(self):
4191 self.check_interrupted_write(b"xy", b"xy", mode="wb")
4192
4193 def test_interrupted_write_text(self):
4194 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
4195
Brett Cannon31f59292011-02-21 19:29:56 +00004196 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004197 def check_reentrant_write(self, data, **fdopen_kwargs):
4198 def on_alarm(*args):
4199 # Will be called reentrantly from the same thread
4200 wio.write(data)
4201 1/0
4202 signal.signal(signal.SIGALRM, on_alarm)
4203 r, w = os.pipe()
4204 wio = self.io.open(w, **fdopen_kwargs)
4205 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004206 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004207 # Either the reentrant call to wio.write() fails with RuntimeError,
4208 # or the signal handler raises ZeroDivisionError.
4209 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4210 while 1:
4211 for i in range(100):
4212 wio.write(data)
4213 wio.flush()
4214 # Make sure the buffer doesn't fill up and block further writes
4215 os.read(r, len(data) * 100)
4216 exc = cm.exception
4217 if isinstance(exc, RuntimeError):
4218 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4219 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004220 signal.alarm(0)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004221 wio.close()
4222 os.close(r)
4223
4224 def test_reentrant_write_buffered(self):
4225 self.check_reentrant_write(b"xy", mode="wb")
4226
4227 def test_reentrant_write_text(self):
4228 self.check_reentrant_write("xy", mode="w", encoding="ascii")
4229
Antoine Pitrou707ce822011-02-25 21:24:11 +00004230 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4231 """Check that a buffered read, when it gets interrupted (either
4232 returning a partial result or EINTR), properly invokes the signal
4233 handler and retries if the latter returned successfully."""
4234 r, w = os.pipe()
4235 fdopen_kwargs["closefd"] = False
4236 def alarm_handler(sig, frame):
4237 os.write(w, b"bar")
4238 signal.signal(signal.SIGALRM, alarm_handler)
4239 try:
4240 rio = self.io.open(r, **fdopen_kwargs)
4241 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004242 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004243 # Expected behaviour:
4244 # - first raw read() returns partial b"foo"
4245 # - second raw read() returns EINTR
4246 # - third raw read() returns b"bar"
4247 self.assertEqual(decode(rio.read(6)), "foobar")
4248 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004249 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004250 rio.close()
4251 os.close(w)
4252 os.close(r)
4253
Antoine Pitrou20db5112011-08-19 20:32:34 +02004254 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004255 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4256 mode="rb")
4257
Antoine Pitrou20db5112011-08-19 20:32:34 +02004258 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004259 self.check_interrupted_read_retry(lambda x: x,
4260 mode="r")
4261
Antoine Pitrou707ce822011-02-25 21:24:11 +00004262 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4263 """Check that a buffered write, when it gets interrupted (either
4264 returning a partial result or EINTR), properly invokes the signal
4265 handler and retries if the latter returned successfully."""
4266 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004267
Antoine Pitrou707ce822011-02-25 21:24:11 +00004268 # A quantity that exceeds the buffer size of an anonymous pipe's
4269 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02004270 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00004271 r, w = os.pipe()
4272 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004273
Antoine Pitrou707ce822011-02-25 21:24:11 +00004274 # We need a separate thread to read from the pipe and allow the
4275 # write() to finish. This thread is started after the SIGALRM is
4276 # received (forcing a first EINTR in write()).
4277 read_results = []
4278 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004279 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00004280 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004281 try:
4282 while not write_finished:
4283 while r in select.select([r], [], [], 1.0)[0]:
4284 s = os.read(r, 1024)
4285 read_results.append(s)
4286 except BaseException as exc:
4287 nonlocal error
4288 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00004289 t = threading.Thread(target=_read)
4290 t.daemon = True
4291 def alarm1(sig, frame):
4292 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004293 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004294 def alarm2(sig, frame):
4295 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004296
4297 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00004298 signal.signal(signal.SIGALRM, alarm1)
4299 try:
4300 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004301 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004302 # Expected behaviour:
4303 # - first raw write() is partial (because of the limited pipe buffer
4304 # and the first alarm)
4305 # - second raw write() returns EINTR (because of the second alarm)
4306 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004307 written = wio.write(large_data)
4308 self.assertEqual(N, written)
4309
Antoine Pitrou707ce822011-02-25 21:24:11 +00004310 wio.flush()
4311 write_finished = True
4312 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004313
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004314 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004315 self.assertEqual(N, sum(len(x) for x in read_results))
4316 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004317 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004318 write_finished = True
4319 os.close(w)
4320 os.close(r)
4321 # This is deliberate. If we didn't close the file descriptor
4322 # before closing wio, wio would try to flush its internal
4323 # buffer, and could block (in case of failure).
4324 try:
4325 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004326 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00004327 if e.errno != errno.EBADF:
4328 raise
4329
Antoine Pitrou20db5112011-08-19 20:32:34 +02004330 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004331 self.check_interrupted_write_retry(b"x", mode="wb")
4332
Antoine Pitrou20db5112011-08-19 20:32:34 +02004333 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004334 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4335
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004336
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004337class CSignalsTest(SignalsTest):
4338 io = io
4339
4340class PySignalsTest(SignalsTest):
4341 io = pyio
4342
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004343 # Handling reentrancy issues would slow down _pyio even more, so the
4344 # tests are disabled.
4345 test_reentrant_write_buffered = None
4346 test_reentrant_write_text = None
4347
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004348
Ezio Melottidaa42c72013-03-23 16:30:16 +02004349def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07004350 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004351 CBufferedReaderTest, PyBufferedReaderTest,
4352 CBufferedWriterTest, PyBufferedWriterTest,
4353 CBufferedRWPairTest, PyBufferedRWPairTest,
4354 CBufferedRandomTest, PyBufferedRandomTest,
4355 StatefulIncrementalDecoderTest,
4356 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4357 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004358 CMiscIOTest, PyMiscIOTest,
4359 CSignalsTest, PySignalsTest,
4360 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004361
4362 # Put the namespaces of the IO module we are testing and some useful mock
4363 # classes in the __dict__ of each test.
4364 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
benfogle9703f092017-11-10 16:03:40 -05004365 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4366 SlowFlushRawIO)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004367 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4368 c_io_ns = {name : getattr(io, name) for name in all_members}
4369 py_io_ns = {name : getattr(pyio, name) for name in all_members}
4370 globs = globals()
4371 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4372 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4373 # Avoid turning open into a bound method.
4374 py_io_ns["open"] = pyio.OpenWrapper
4375 for test in tests:
4376 if test.__name__.startswith("C"):
4377 for name, obj in c_io_ns.items():
4378 setattr(test, name, obj)
4379 elif test.__name__.startswith("Py"):
4380 for name, obj in py_io_ns.items():
4381 setattr(test, name, obj)
4382
Ezio Melottidaa42c72013-03-23 16:30:16 +02004383 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4384 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004385
4386if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004387 unittest.main()