blob: 8c91ad2ad8aa1c09f0ead1917df56bd614cd6115 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Berker Peksagce643912015-05-06 06:33:17 +030038from test.support.script_helper import assert_python_ok, run_python_until_end
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000047
Martin Panter6bb91f32016-05-28 00:41:57 +000048try:
49 import ctypes
50except ImportError:
51 def byteslike(*pos, **kw):
52 return array.array("b", bytes(*pos, **kw))
53else:
54 def byteslike(*pos, **kw):
55 """Create a bytes-like object having no string or sequence methods"""
56 data = bytes(*pos, **kw)
57 obj = EmptyStruct()
58 ctypes.resize(obj, len(data))
59 memoryview(obj).cast("B")[:] = data
60 return obj
61 class EmptyStruct(ctypes.Structure):
62 pass
63
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064def _default_chunk_size():
65 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000066 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000067 return f._CHUNK_SIZE
68
69
Antoine Pitrou328ec742010-09-14 18:37:24 +000070class MockRawIOWithoutRead:
71 """A RawIO implementation without read(), so as to exercise the default
72 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000073
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000074 def __init__(self, read_stack=()):
75 self._read_stack = list(read_stack)
76 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000077 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000078 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000079
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000082 return len(b)
83
84 def writable(self):
85 return True
86
Guido van Rossum68bbcd22007-02-27 17:19:33 +000087 def fileno(self):
88 return 42
89
90 def readable(self):
91 return True
92
Guido van Rossum01a27522007-03-07 01:00:12 +000093 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000094 return True
95
Guido van Rossum01a27522007-03-07 01:00:12 +000096 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000098
99 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000100 return 0 # same comment as above
101
102 def readinto(self, buf):
103 self._reads += 1
104 max_len = len(buf)
105 try:
106 data = self._read_stack[0]
107 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000108 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000109 return 0
110 if data is None:
111 del self._read_stack[0]
112 return None
113 n = len(data)
114 if len(data) <= max_len:
115 del self._read_stack[0]
116 buf[:n] = data
117 return n
118 else:
119 buf[:] = data[:max_len]
120 self._read_stack[0] = data[max_len:]
121 return max_len
122
123 def truncate(self, pos=None):
124 return pos
125
Antoine Pitrou328ec742010-09-14 18:37:24 +0000126class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
127 pass
128
129class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
130 pass
131
132
133class MockRawIO(MockRawIOWithoutRead):
134
135 def read(self, n=None):
136 self._reads += 1
137 try:
138 return self._read_stack.pop(0)
139 except:
140 self._extraneous_reads += 1
141 return b""
142
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000143class CMockRawIO(MockRawIO, io.RawIOBase):
144 pass
145
146class PyMockRawIO(MockRawIO, pyio.RawIOBase):
147 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000148
Guido van Rossuma9e20242007-03-08 00:43:48 +0000149
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000150class MisbehavedRawIO(MockRawIO):
151 def write(self, b):
152 return super().write(b) * 2
153
154 def read(self, n=None):
155 return super().read(n) * 2
156
157 def seek(self, pos, whence):
158 return -123
159
160 def tell(self):
161 return -456
162
163 def readinto(self, buf):
164 super().readinto(buf)
165 return len(buf) * 5
166
167class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
168 pass
169
170class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
171 pass
172
173
174class CloseFailureIO(MockRawIO):
175 closed = 0
176
177 def close(self):
178 if not self.closed:
179 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200180 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181
182class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
183 pass
184
185class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
186 pass
187
188
189class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000190
191 def __init__(self, data):
192 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000193 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000194
195 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000197 self.read_history.append(None if res is None else len(res))
198 return res
199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000200 def readinto(self, b):
201 res = super().readinto(b)
202 self.read_history.append(res)
203 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000204
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000205class CMockFileIO(MockFileIO, io.BytesIO):
206 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000207
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000208class PyMockFileIO(MockFileIO, pyio.BytesIO):
209 pass
210
211
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000212class MockUnseekableIO:
213 def seekable(self):
214 return False
215
216 def seek(self, *args):
217 raise self.UnsupportedOperation("not seekable")
218
219 def tell(self, *args):
220 raise self.UnsupportedOperation("not seekable")
221
Martin Panter754aab22016-03-31 07:21:56 +0000222 def truncate(self, *args):
223 raise self.UnsupportedOperation("not seekable")
224
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000225class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
226 UnsupportedOperation = io.UnsupportedOperation
227
228class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
229 UnsupportedOperation = pyio.UnsupportedOperation
230
231
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000232class MockNonBlockWriterIO:
233
234 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000235 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000236 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000238 def pop_written(self):
239 s = b"".join(self._write_stack)
240 self._write_stack[:] = []
241 return s
242
243 def block_on(self, char):
244 """Block when a given char is encountered."""
245 self._blocker_char = char
246
247 def readable(self):
248 return True
249
250 def seekable(self):
251 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000252
Guido van Rossum01a27522007-03-07 01:00:12 +0000253 def writable(self):
254 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000255
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000256 def write(self, b):
257 b = bytes(b)
258 n = -1
259 if self._blocker_char:
260 try:
261 n = b.index(self._blocker_char)
262 except ValueError:
263 pass
264 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100265 if n > 0:
266 # write data up to the first blocker
267 self._write_stack.append(b[:n])
268 return n
269 else:
270 # cancel blocker and indicate would block
271 self._blocker_char = None
272 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000273 self._write_stack.append(b)
274 return len(b)
275
276class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
277 BlockingIOError = io.BlockingIOError
278
279class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
280 BlockingIOError = pyio.BlockingIOError
281
Guido van Rossuma9e20242007-03-08 00:43:48 +0000282
Guido van Rossum28524c72007-02-27 05:47:44 +0000283class IOTest(unittest.TestCase):
284
Neal Norwitze7789b12008-03-24 06:18:09 +0000285 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000286 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000287
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000288 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000289 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000290
Guido van Rossum28524c72007-02-27 05:47:44 +0000291 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000292 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293 f.truncate(0)
294 self.assertEqual(f.tell(), 5)
295 f.seek(0)
296
297 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000298 self.assertEqual(f.seek(0), 0)
299 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000300 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000301 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000302 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000303 buffer = bytearray(b" world\n\n\n")
304 self.assertEqual(f.write(buffer), 9)
305 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000306 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000307 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000308 self.assertEqual(f.seek(-1, 2), 13)
309 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000310
Guido van Rossum87429772007-04-10 21:06:59 +0000311 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000312 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000313 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000314
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 def read_ops(self, f, buffered=False):
316 data = f.read(5)
317 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000318 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000319 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000320 self.assertEqual(bytes(data), b" worl")
321 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000322 self.assertEqual(f.readinto(data), 2)
323 self.assertEqual(len(data), 5)
324 self.assertEqual(data[:2], b"d\n")
325 self.assertEqual(f.seek(0), 0)
326 self.assertEqual(f.read(20), b"hello world\n")
327 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000328 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000329 self.assertEqual(f.seek(-6, 2), 6)
330 self.assertEqual(f.read(5), b"world")
331 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000332 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000333 self.assertEqual(f.seek(-6, 1), 5)
334 self.assertEqual(f.read(5), b" worl")
335 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000336 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000337 if buffered:
338 f.seek(0)
339 self.assertEqual(f.read(), b"hello world\n")
340 f.seek(6)
341 self.assertEqual(f.read(), b"world\n")
342 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000343 f.seek(0)
344 data = byteslike(5)
345 self.assertEqual(f.readinto1(data), 5)
346 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000347
Guido van Rossum34d69e52007-04-10 20:08:41 +0000348 LARGE = 2**31
349
Guido van Rossum53807da2007-04-10 19:01:47 +0000350 def large_file_ops(self, f):
351 assert f.readable()
352 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100353 try:
354 self.assertEqual(f.seek(self.LARGE), self.LARGE)
355 except (OverflowError, ValueError):
356 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000357 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000358 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000359 self.assertEqual(f.tell(), self.LARGE + 3)
360 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000361 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000362 self.assertEqual(f.tell(), self.LARGE + 2)
363 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000364 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000365 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000366 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
367 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000368 self.assertEqual(f.read(2), b"x")
369
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000370 def test_invalid_operations(self):
371 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000372 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000373 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000374 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000375 self.assertRaises(exc, fp.read)
376 self.assertRaises(exc, fp.readline)
377 with self.open(support.TESTFN, "wb", buffering=0) as fp:
378 self.assertRaises(exc, fp.read)
379 self.assertRaises(exc, fp.readline)
380 with self.open(support.TESTFN, "rb", buffering=0) as fp:
381 self.assertRaises(exc, fp.write, b"blah")
382 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000383 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000384 self.assertRaises(exc, fp.write, b"blah")
385 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000386 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000387 self.assertRaises(exc, fp.write, "blah")
388 self.assertRaises(exc, fp.writelines, ["blah\n"])
389 # Non-zero seeking from current or end pos
390 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
391 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000392
Martin Panter754aab22016-03-31 07:21:56 +0000393 def test_optional_abilities(self):
394 # Test for OSError when optional APIs are not supported
395 # The purpose of this test is to try fileno(), reading, writing and
396 # seeking operations with various objects that indicate they do not
397 # support these operations.
398
399 def pipe_reader():
400 [r, w] = os.pipe()
401 os.close(w) # So that read() is harmless
402 return self.FileIO(r, "r")
403
404 def pipe_writer():
405 [r, w] = os.pipe()
406 self.addCleanup(os.close, r)
407 # Guarantee that we can write into the pipe without blocking
408 thread = threading.Thread(target=os.read, args=(r, 100))
409 thread.start()
410 self.addCleanup(thread.join)
411 return self.FileIO(w, "w")
412
413 def buffered_reader():
414 return self.BufferedReader(self.MockUnseekableIO())
415
416 def buffered_writer():
417 return self.BufferedWriter(self.MockUnseekableIO())
418
419 def buffered_random():
420 return self.BufferedRandom(self.BytesIO())
421
422 def buffered_rw_pair():
423 return self.BufferedRWPair(self.MockUnseekableIO(),
424 self.MockUnseekableIO())
425
426 def text_reader():
427 class UnseekableReader(self.MockUnseekableIO):
428 writable = self.BufferedIOBase.writable
429 write = self.BufferedIOBase.write
430 return self.TextIOWrapper(UnseekableReader(), "ascii")
431
432 def text_writer():
433 class UnseekableWriter(self.MockUnseekableIO):
434 readable = self.BufferedIOBase.readable
435 read = self.BufferedIOBase.read
436 return self.TextIOWrapper(UnseekableWriter(), "ascii")
437
438 tests = (
439 (pipe_reader, "fr"), (pipe_writer, "fw"),
440 (buffered_reader, "r"), (buffered_writer, "w"),
441 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
442 (text_reader, "r"), (text_writer, "w"),
443 (self.BytesIO, "rws"), (self.StringIO, "rws"),
444 )
445 for [test, abilities] in tests:
446 if test is pipe_writer and not threading:
447 continue # Skip subtest that uses a background thread
448 with self.subTest(test), test() as obj:
449 readable = "r" in abilities
450 self.assertEqual(obj.readable(), readable)
451 writable = "w" in abilities
452 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000453
454 if isinstance(obj, self.TextIOBase):
455 data = "3"
456 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
457 data = b"3"
458 else:
459 self.fail("Unknown base class")
460
461 if "f" in abilities:
462 obj.fileno()
463 else:
464 self.assertRaises(OSError, obj.fileno)
465
466 if readable:
467 obj.read(1)
468 obj.read()
469 else:
470 self.assertRaises(OSError, obj.read, 1)
471 self.assertRaises(OSError, obj.read)
472
473 if writable:
474 obj.write(data)
475 else:
476 self.assertRaises(OSError, obj.write, data)
477
Martin Panter3ee147f2016-03-31 21:05:31 +0000478 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000479 pipe_reader, pipe_writer):
480 # Pipes seem to appear as seekable on Windows
481 continue
482 seekable = "s" in abilities
483 self.assertEqual(obj.seekable(), seekable)
484
Martin Panter754aab22016-03-31 07:21:56 +0000485 if seekable:
486 obj.tell()
487 obj.seek(0)
488 else:
489 self.assertRaises(OSError, obj.tell)
490 self.assertRaises(OSError, obj.seek, 0)
491
492 if writable and seekable:
493 obj.truncate()
494 obj.truncate(0)
495 else:
496 self.assertRaises(OSError, obj.truncate)
497 self.assertRaises(OSError, obj.truncate, 0)
498
Antoine Pitrou13348842012-01-29 18:36:34 +0100499 def test_open_handles_NUL_chars(self):
500 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300501 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100502
503 bytes_fn = bytes(fn_with_NUL, 'ascii')
504 with warnings.catch_warnings():
505 warnings.simplefilter("ignore", DeprecationWarning)
506 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100507
Guido van Rossum28524c72007-02-27 05:47:44 +0000508 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000509 with self.open(support.TESTFN, "wb", buffering=0) as f:
510 self.assertEqual(f.readable(), False)
511 self.assertEqual(f.writable(), True)
512 self.assertEqual(f.seekable(), True)
513 self.write_ops(f)
514 with self.open(support.TESTFN, "rb", buffering=0) as f:
515 self.assertEqual(f.readable(), True)
516 self.assertEqual(f.writable(), False)
517 self.assertEqual(f.seekable(), True)
518 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000519
Guido van Rossum87429772007-04-10 21:06:59 +0000520 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000521 with self.open(support.TESTFN, "wb") as f:
522 self.assertEqual(f.readable(), False)
523 self.assertEqual(f.writable(), True)
524 self.assertEqual(f.seekable(), True)
525 self.write_ops(f)
526 with self.open(support.TESTFN, "rb") as f:
527 self.assertEqual(f.readable(), True)
528 self.assertEqual(f.writable(), False)
529 self.assertEqual(f.seekable(), True)
530 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000531
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000532 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000533 with self.open(support.TESTFN, "wb") as f:
534 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
535 with self.open(support.TESTFN, "rb") as f:
536 self.assertEqual(f.readline(), b"abc\n")
537 self.assertEqual(f.readline(10), b"def\n")
538 self.assertEqual(f.readline(2), b"xy")
539 self.assertEqual(f.readline(4), b"zzy\n")
540 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000541 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000542 self.assertRaises(TypeError, f.readline, 5.3)
543 with self.open(support.TESTFN, "r") as f:
544 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000545
Serhiy Storchaka680fea42017-04-19 21:22:49 +0300546 def test_readline_nonsizeable(self):
547 # Issue #30061
548 # Crash when readline() returns an object without __len__
549 class R(self.IOBase):
550 def readline(self):
551 return None
552 self.assertRaises((TypeError, StopIteration), next, R())
553
554 def test_next_nonsizeable(self):
555 # Issue #30061
556 # Crash when __next__() returns an object without __len__
557 class R(self.IOBase):
558 def __next__(self):
559 return None
560 self.assertRaises(TypeError, R().readlines, 1)
561
Guido van Rossum28524c72007-02-27 05:47:44 +0000562 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000563 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000564 self.write_ops(f)
565 data = f.getvalue()
566 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000567 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000568 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000569
Guido van Rossum53807da2007-04-10 19:01:47 +0000570 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000571 # On Windows and Mac OSX this test comsumes large resources; It takes
572 # a long time to build the >2GB file and takes >2GB of disk space
573 # therefore the resource must be enabled to run this test.
574 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600575 support.requires(
576 'largefile',
577 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000578 with self.open(support.TESTFN, "w+b", 0) as f:
579 self.large_file_ops(f)
580 with self.open(support.TESTFN, "w+b") as f:
581 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000582
583 def test_with_open(self):
584 for bufsize in (0, 1, 100):
585 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000586 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000587 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000588 self.assertEqual(f.closed, True)
589 f = None
590 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000591 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000592 1/0
593 except ZeroDivisionError:
594 self.assertEqual(f.closed, True)
595 else:
596 self.fail("1/0 didn't raise an exception")
597
Antoine Pitrou08838b62009-01-21 00:55:13 +0000598 # issue 5008
599 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000600 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000601 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000602 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000603 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000604 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000605 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000606 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300607 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000608
Guido van Rossum87429772007-04-10 21:06:59 +0000609 def test_destructor(self):
610 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000611 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000612 def __del__(self):
613 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000614 try:
615 f = super().__del__
616 except AttributeError:
617 pass
618 else:
619 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000620 def close(self):
621 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000622 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000623 def flush(self):
624 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000625 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000626 with support.check_warnings(('', ResourceWarning)):
627 f = MyFileIO(support.TESTFN, "wb")
628 f.write(b"xxx")
629 del f
630 support.gc_collect()
631 self.assertEqual(record, [1, 2, 3])
632 with self.open(support.TESTFN, "rb") as f:
633 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000634
635 def _check_base_destructor(self, base):
636 record = []
637 class MyIO(base):
638 def __init__(self):
639 # This exercises the availability of attributes on object
640 # destruction.
641 # (in the C version, close() is called by the tp_dealloc
642 # function, not by __del__)
643 self.on_del = 1
644 self.on_close = 2
645 self.on_flush = 3
646 def __del__(self):
647 record.append(self.on_del)
648 try:
649 f = super().__del__
650 except AttributeError:
651 pass
652 else:
653 f()
654 def close(self):
655 record.append(self.on_close)
656 super().close()
657 def flush(self):
658 record.append(self.on_flush)
659 super().flush()
660 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000661 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000662 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000663 self.assertEqual(record, [1, 2, 3])
664
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000665 def test_IOBase_destructor(self):
666 self._check_base_destructor(self.IOBase)
667
668 def test_RawIOBase_destructor(self):
669 self._check_base_destructor(self.RawIOBase)
670
671 def test_BufferedIOBase_destructor(self):
672 self._check_base_destructor(self.BufferedIOBase)
673
674 def test_TextIOBase_destructor(self):
675 self._check_base_destructor(self.TextIOBase)
676
Guido van Rossum87429772007-04-10 21:06:59 +0000677 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000678 with self.open(support.TESTFN, "wb") as f:
679 f.write(b"xxx")
680 with self.open(support.TESTFN, "rb") as f:
681 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000682
Guido van Rossumd4103952007-04-12 05:44:49 +0000683 def test_array_writes(self):
684 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000685 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000686 def check(f):
687 with f:
688 self.assertEqual(f.write(a), n)
689 f.writelines((a,))
690 check(self.BytesIO())
691 check(self.FileIO(support.TESTFN, "w"))
692 check(self.BufferedWriter(self.MockRawIO()))
693 check(self.BufferedRandom(self.MockRawIO()))
694 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000695
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000696 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000697 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000698 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000699
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000700 def test_read_closed(self):
701 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000702 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000703 with self.open(support.TESTFN, "r") as f:
704 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000705 self.assertEqual(file.read(), "egg\n")
706 file.seek(0)
707 file.close()
708 self.assertRaises(ValueError, file.read)
709
710 def test_no_closefd_with_filename(self):
711 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000712 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000713
714 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000715 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000716 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000717 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000718 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000719 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000720 self.assertEqual(file.buffer.raw.closefd, False)
721
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000722 def test_garbage_collection(self):
723 # FileIO objects are collected, and collecting them flushes
724 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000725 with support.check_warnings(('', ResourceWarning)):
726 f = self.FileIO(support.TESTFN, "wb")
727 f.write(b"abcxxx")
728 f.f = f
729 wr = weakref.ref(f)
730 del f
731 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300732 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000733 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000734 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000735
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000736 def test_unbounded_file(self):
737 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
738 zero = "/dev/zero"
739 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000740 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000741 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000742 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000743 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000744 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000745 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000746 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000747 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000748 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000749 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000750 self.assertRaises(OverflowError, f.read)
751
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200752 def check_flush_error_on_close(self, *args, **kwargs):
753 # Test that the file is closed despite failed flush
754 # and that flush() is called before file closed.
755 f = self.open(*args, **kwargs)
756 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000757 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200758 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200759 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000760 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200761 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600762 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200763 self.assertTrue(closed) # flush() called
764 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200765 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200766
767 def test_flush_error_on_close(self):
768 # raw file
769 # Issue #5700: io.FileIO calls flush() after file closed
770 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
771 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
772 self.check_flush_error_on_close(fd, 'wb', buffering=0)
773 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
774 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
775 os.close(fd)
776 # buffered io
777 self.check_flush_error_on_close(support.TESTFN, 'wb')
778 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
779 self.check_flush_error_on_close(fd, 'wb')
780 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
781 self.check_flush_error_on_close(fd, 'wb', closefd=False)
782 os.close(fd)
783 # text io
784 self.check_flush_error_on_close(support.TESTFN, 'w')
785 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
786 self.check_flush_error_on_close(fd, 'w')
787 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
788 self.check_flush_error_on_close(fd, 'w', closefd=False)
789 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000790
791 def test_multi_close(self):
792 f = self.open(support.TESTFN, "wb", buffering=0)
793 f.close()
794 f.close()
795 f.close()
796 self.assertRaises(ValueError, f.flush)
797
Antoine Pitrou328ec742010-09-14 18:37:24 +0000798 def test_RawIOBase_read(self):
799 # Exercise the default RawIOBase.read() implementation (which calls
800 # readinto() internally).
801 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
802 self.assertEqual(rawio.read(2), b"ab")
803 self.assertEqual(rawio.read(2), b"c")
804 self.assertEqual(rawio.read(2), b"d")
805 self.assertEqual(rawio.read(2), None)
806 self.assertEqual(rawio.read(2), b"ef")
807 self.assertEqual(rawio.read(2), b"g")
808 self.assertEqual(rawio.read(2), None)
809 self.assertEqual(rawio.read(2), b"")
810
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400811 def test_types_have_dict(self):
812 test = (
813 self.IOBase(),
814 self.RawIOBase(),
815 self.TextIOBase(),
816 self.StringIO(),
817 self.BytesIO()
818 )
819 for obj in test:
820 self.assertTrue(hasattr(obj, "__dict__"))
821
Ross Lagerwall59142db2011-10-31 20:34:46 +0200822 def test_opener(self):
823 with self.open(support.TESTFN, "w") as f:
824 f.write("egg\n")
825 fd = os.open(support.TESTFN, os.O_RDONLY)
826 def opener(path, flags):
827 return fd
828 with self.open("non-existent", "r", opener=opener) as f:
829 self.assertEqual(f.read(), "egg\n")
830
Barry Warsaw480e2852016-06-08 17:47:26 -0400831 def test_bad_opener_negative_1(self):
832 # Issue #27066.
833 def badopener(fname, flags):
834 return -1
835 with self.assertRaises(ValueError) as cm:
836 open('non-existent', 'r', opener=badopener)
837 self.assertEqual(str(cm.exception), 'opener returned -1')
838
839 def test_bad_opener_other_negative(self):
840 # Issue #27066.
841 def badopener(fname, flags):
842 return -2
843 with self.assertRaises(ValueError) as cm:
844 open('non-existent', 'r', opener=badopener)
845 self.assertEqual(str(cm.exception), 'opener returned -2')
846
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200847 def test_fileio_closefd(self):
848 # Issue #4841
849 with self.open(__file__, 'rb') as f1, \
850 self.open(__file__, 'rb') as f2:
851 fileio = self.FileIO(f1.fileno(), closefd=False)
852 # .__init__() must not close f1
853 fileio.__init__(f2.fileno(), closefd=False)
854 f1.readline()
855 # .close() must not close f2
856 fileio.close()
857 f2.readline()
858
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300859 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200860 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300861 with self.assertRaises(ValueError):
862 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300863
864 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200865 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300866 with self.assertRaises(ValueError):
867 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300868
Martin Panter6bb91f32016-05-28 00:41:57 +0000869 def test_buffered_readinto_mixin(self):
870 # Test the implementation provided by BufferedIOBase
871 class Stream(self.BufferedIOBase):
872 def read(self, size):
873 return b"12345"
874 read1 = read
875 stream = Stream()
876 for method in ("readinto", "readinto1"):
877 with self.subTest(method):
878 buffer = byteslike(5)
879 self.assertEqual(getattr(stream, method)(buffer), 5)
880 self.assertEqual(bytes(buffer), b"12345")
881
Ethan Furmand62548a2016-06-04 14:38:43 -0700882 def test_fspath_support(self):
883 class PathLike:
884 def __init__(self, path):
885 self.path = path
886
887 def __fspath__(self):
888 return self.path
889
890 def check_path_succeeds(path):
891 with self.open(path, "w") as f:
892 f.write("egg\n")
893
894 with self.open(path, "r") as f:
895 self.assertEqual(f.read(), "egg\n")
896
897 check_path_succeeds(PathLike(support.TESTFN))
898 check_path_succeeds(PathLike(support.TESTFN.encode('utf-8')))
899
900 bad_path = PathLike(TypeError)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700901 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700902 self.open(bad_path, 'w')
903
904 # ensure that refcounting is correct with some error conditions
905 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
906 self.open(PathLike(support.TESTFN), 'rwxa')
907
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200908
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000909class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200910
911 def test_IOBase_finalize(self):
912 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
913 # class which inherits IOBase and an object of this class are caught
914 # in a reference cycle and close() is already in the method cache.
915 class MyIO(self.IOBase):
916 def close(self):
917 pass
918
919 # create an instance to populate the method cache
920 MyIO()
921 obj = MyIO()
922 obj.obj = obj
923 wr = weakref.ref(obj)
924 del MyIO
925 del obj
926 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300927 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000928
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000929class PyIOTest(IOTest):
930 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000931
Guido van Rossuma9e20242007-03-08 00:43:48 +0000932
Gregory P. Smith1bef9072015-04-14 13:24:34 -0700933@support.cpython_only
934class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -0700935
Gregory P. Smith054b0652015-04-14 12:58:05 -0700936 def test_RawIOBase_io_in_pyio_match(self):
937 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +0200938 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
939 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -0700940 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
941
942 def test_RawIOBase_pyio_in_io_match(self):
943 """Test that c RawIOBase class has all pyio RawIOBase methods"""
944 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
945 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
946
947
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000948class CommonBufferedTests:
949 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
950
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000951 def test_detach(self):
952 raw = self.MockRawIO()
953 buf = self.tp(raw)
954 self.assertIs(buf.detach(), raw)
955 self.assertRaises(ValueError, buf.detach)
956
Benjamin Peterson10e76b62014-12-21 20:51:50 -0600957 repr(buf) # Should still work
958
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000959 def test_fileno(self):
960 rawio = self.MockRawIO()
961 bufio = self.tp(rawio)
962
Ezio Melottib3aedd42010-11-20 19:04:17 +0000963 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000964
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000965 def test_invalid_args(self):
966 rawio = self.MockRawIO()
967 bufio = self.tp(rawio)
968 # Invalid whence
969 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200970 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000971
972 def test_override_destructor(self):
973 tp = self.tp
974 record = []
975 class MyBufferedIO(tp):
976 def __del__(self):
977 record.append(1)
978 try:
979 f = super().__del__
980 except AttributeError:
981 pass
982 else:
983 f()
984 def close(self):
985 record.append(2)
986 super().close()
987 def flush(self):
988 record.append(3)
989 super().flush()
990 rawio = self.MockRawIO()
991 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000992 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000993 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +0000994 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000995
996 def test_context_manager(self):
997 # Test usability as a context manager
998 rawio = self.MockRawIO()
999 bufio = self.tp(rawio)
1000 def _with():
1001 with bufio:
1002 pass
1003 _with()
1004 # bufio should now be closed, and using it a second time should raise
1005 # a ValueError.
1006 self.assertRaises(ValueError, _with)
1007
1008 def test_error_through_destructor(self):
1009 # Test that the exception state is not modified by a destructor,
1010 # even if close() fails.
1011 rawio = self.CloseFailureIO()
1012 def f():
1013 self.tp(rawio).xyzzy
1014 with support.captured_output("stderr") as s:
1015 self.assertRaises(AttributeError, f)
1016 s = s.getvalue().strip()
1017 if s:
1018 # The destructor *may* have printed an unraisable error, check it
1019 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001020 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001021 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +00001022
Antoine Pitrou716c4442009-05-23 19:04:03 +00001023 def test_repr(self):
1024 raw = self.MockRawIO()
1025 b = self.tp(raw)
Serhiy Storchaka521e5862014-07-22 15:00:37 +03001026 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001027 self.assertEqual(repr(b), "<%s>" % clsname)
1028 raw.name = "dummy"
1029 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
1030 raw.name = b"dummy"
1031 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
1032
Serhiy Storchakafca705d2017-03-19 20:27:16 +02001033 def test_recursive_repr(self):
1034 # Issue #25455
1035 raw = self.MockRawIO()
1036 b = self.tp(raw)
1037 with support.swap_attr(raw, 'name', b):
1038 try:
1039 repr(b) # Should not crash
1040 except RuntimeError:
1041 pass
1042
Antoine Pitrou6be88762010-05-03 16:48:20 +00001043 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001044 # Test that buffered file is closed despite failed flush
1045 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001046 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001047 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001048 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001049 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001050 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001051 raw.flush = bad_flush
1052 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001053 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001054 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001055 self.assertTrue(raw.closed)
1056 self.assertTrue(closed) # flush() called
1057 self.assertFalse(closed[0]) # flush() called before file closed
1058 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001059 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001060
1061 def test_close_error_on_close(self):
1062 raw = self.MockRawIO()
1063 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001064 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001065 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001066 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001067 raw.close = bad_close
1068 b = self.tp(raw)
1069 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001070 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001071 b.close()
1072 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001073 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001074 self.assertEqual(err.exception.__context__.args, ('flush',))
1075 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001076
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001077 def test_nonnormalized_close_error_on_close(self):
1078 # Issue #21677
1079 raw = self.MockRawIO()
1080 def bad_flush():
1081 raise non_existing_flush
1082 def bad_close():
1083 raise non_existing_close
1084 raw.close = bad_close
1085 b = self.tp(raw)
1086 b.flush = bad_flush
1087 with self.assertRaises(NameError) as err: # exception not swallowed
1088 b.close()
1089 self.assertIn('non_existing_close', str(err.exception))
1090 self.assertIsInstance(err.exception.__context__, NameError)
1091 self.assertIn('non_existing_flush', str(err.exception.__context__))
1092 self.assertFalse(b.closed)
1093
Antoine Pitrou6be88762010-05-03 16:48:20 +00001094 def test_multi_close(self):
1095 raw = self.MockRawIO()
1096 b = self.tp(raw)
1097 b.close()
1098 b.close()
1099 b.close()
1100 self.assertRaises(ValueError, b.flush)
1101
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001102 def test_unseekable(self):
1103 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1104 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1105 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1106
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001107 def test_readonly_attributes(self):
1108 raw = self.MockRawIO()
1109 buf = self.tp(raw)
1110 x = self.MockRawIO()
1111 with self.assertRaises(AttributeError):
1112 buf.raw = x
1113
Guido van Rossum78892e42007-04-06 17:31:18 +00001114
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001115class SizeofTest:
1116
1117 @support.cpython_only
1118 def test_sizeof(self):
1119 bufsize1 = 4096
1120 bufsize2 = 8192
1121 rawio = self.MockRawIO()
1122 bufio = self.tp(rawio, buffer_size=bufsize1)
1123 size = sys.getsizeof(bufio) - bufsize1
1124 rawio = self.MockRawIO()
1125 bufio = self.tp(rawio, buffer_size=bufsize2)
1126 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1127
Jesus Ceadc469452012-10-04 12:37:56 +02001128 @support.cpython_only
1129 def test_buffer_freeing(self) :
1130 bufsize = 4096
1131 rawio = self.MockRawIO()
1132 bufio = self.tp(rawio, buffer_size=bufsize)
1133 size = sys.getsizeof(bufio) - bufsize
1134 bufio.close()
1135 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001136
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001137class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1138 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001139
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001140 def test_constructor(self):
1141 rawio = self.MockRawIO([b"abc"])
1142 bufio = self.tp(rawio)
1143 bufio.__init__(rawio)
1144 bufio.__init__(rawio, buffer_size=1024)
1145 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001146 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001147 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1148 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1149 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1150 rawio = self.MockRawIO([b"abc"])
1151 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001152 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001153
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001154 def test_uninitialized(self):
1155 bufio = self.tp.__new__(self.tp)
1156 del bufio
1157 bufio = self.tp.__new__(self.tp)
1158 self.assertRaisesRegex((ValueError, AttributeError),
1159 'uninitialized|has no attribute',
1160 bufio.read, 0)
1161 bufio.__init__(self.MockRawIO())
1162 self.assertEqual(bufio.read(0), b'')
1163
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001164 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001165 for arg in (None, 7):
1166 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1167 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001168 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001169 # Invalid args
1170 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001171
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001172 def test_read1(self):
1173 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1174 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001175 self.assertEqual(b"a", bufio.read(1))
1176 self.assertEqual(b"b", bufio.read1(1))
1177 self.assertEqual(rawio._reads, 1)
1178 self.assertEqual(b"c", bufio.read1(100))
1179 self.assertEqual(rawio._reads, 1)
1180 self.assertEqual(b"d", bufio.read1(100))
1181 self.assertEqual(rawio._reads, 2)
1182 self.assertEqual(b"efg", bufio.read1(100))
1183 self.assertEqual(rawio._reads, 3)
1184 self.assertEqual(b"", bufio.read1(100))
1185 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001186 # Invalid args
1187 self.assertRaises(ValueError, bufio.read1, -1)
1188
1189 def test_readinto(self):
1190 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1191 bufio = self.tp(rawio)
1192 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001193 self.assertEqual(bufio.readinto(b), 2)
1194 self.assertEqual(b, b"ab")
1195 self.assertEqual(bufio.readinto(b), 2)
1196 self.assertEqual(b, b"cd")
1197 self.assertEqual(bufio.readinto(b), 2)
1198 self.assertEqual(b, b"ef")
1199 self.assertEqual(bufio.readinto(b), 1)
1200 self.assertEqual(b, b"gf")
1201 self.assertEqual(bufio.readinto(b), 0)
1202 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001203 rawio = self.MockRawIO((b"abc", None))
1204 bufio = self.tp(rawio)
1205 self.assertEqual(bufio.readinto(b), 2)
1206 self.assertEqual(b, b"ab")
1207 self.assertEqual(bufio.readinto(b), 1)
1208 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001209
Benjamin Petersona96fea02014-06-22 14:17:44 -07001210 def test_readinto1(self):
1211 buffer_size = 10
1212 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1213 bufio = self.tp(rawio, buffer_size=buffer_size)
1214 b = bytearray(2)
1215 self.assertEqual(bufio.peek(3), b'abc')
1216 self.assertEqual(rawio._reads, 1)
1217 self.assertEqual(bufio.readinto1(b), 2)
1218 self.assertEqual(b, b"ab")
1219 self.assertEqual(rawio._reads, 1)
1220 self.assertEqual(bufio.readinto1(b), 1)
1221 self.assertEqual(b[:1], b"c")
1222 self.assertEqual(rawio._reads, 1)
1223 self.assertEqual(bufio.readinto1(b), 2)
1224 self.assertEqual(b, b"de")
1225 self.assertEqual(rawio._reads, 2)
1226 b = bytearray(2*buffer_size)
1227 self.assertEqual(bufio.peek(3), b'fgh')
1228 self.assertEqual(rawio._reads, 3)
1229 self.assertEqual(bufio.readinto1(b), 6)
1230 self.assertEqual(b[:6], b"fghjkl")
1231 self.assertEqual(rawio._reads, 4)
1232
1233 def test_readinto_array(self):
1234 buffer_size = 60
1235 data = b"a" * 26
1236 rawio = self.MockRawIO((data,))
1237 bufio = self.tp(rawio, buffer_size=buffer_size)
1238
1239 # Create an array with element size > 1 byte
1240 b = array.array('i', b'x' * 32)
1241 assert len(b) != 16
1242
1243 # Read into it. We should get as many *bytes* as we can fit into b
1244 # (which is more than the number of elements)
1245 n = bufio.readinto(b)
1246 self.assertGreater(n, len(b))
1247
1248 # Check that old contents of b are preserved
1249 bm = memoryview(b).cast('B')
1250 self.assertLess(n, len(bm))
1251 self.assertEqual(bm[:n], data[:n])
1252 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1253
1254 def test_readinto1_array(self):
1255 buffer_size = 60
1256 data = b"a" * 26
1257 rawio = self.MockRawIO((data,))
1258 bufio = self.tp(rawio, buffer_size=buffer_size)
1259
1260 # Create an array with element size > 1 byte
1261 b = array.array('i', b'x' * 32)
1262 assert len(b) != 16
1263
1264 # Read into it. We should get as many *bytes* as we can fit into b
1265 # (which is more than the number of elements)
1266 n = bufio.readinto1(b)
1267 self.assertGreater(n, len(b))
1268
1269 # Check that old contents of b are preserved
1270 bm = memoryview(b).cast('B')
1271 self.assertLess(n, len(bm))
1272 self.assertEqual(bm[:n], data[:n])
1273 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1274
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001275 def test_readlines(self):
1276 def bufio():
1277 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1278 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001279 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1280 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1281 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001282
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001283 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001284 data = b"abcdefghi"
1285 dlen = len(data)
1286
1287 tests = [
1288 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1289 [ 100, [ 3, 3, 3], [ dlen ] ],
1290 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1291 ]
1292
1293 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001294 rawio = self.MockFileIO(data)
1295 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001296 pos = 0
1297 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001298 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001299 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001300 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001301 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001302
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001303 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001304 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001305 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1306 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001307 self.assertEqual(b"abcd", bufio.read(6))
1308 self.assertEqual(b"e", bufio.read(1))
1309 self.assertEqual(b"fg", bufio.read())
1310 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001311 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001312 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001313
Victor Stinnera80987f2011-05-25 22:47:16 +02001314 rawio = self.MockRawIO((b"a", None, None))
1315 self.assertEqual(b"a", rawio.readall())
1316 self.assertIsNone(rawio.readall())
1317
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001318 def test_read_past_eof(self):
1319 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1320 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001321
Ezio Melottib3aedd42010-11-20 19:04:17 +00001322 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001323
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001324 def test_read_all(self):
1325 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1326 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001327
Ezio Melottib3aedd42010-11-20 19:04:17 +00001328 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001329
Victor Stinner45df8202010-04-28 22:31:17 +00001330 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001331 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001332 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001333 try:
1334 # Write out many bytes with exactly the same number of 0's,
1335 # 1's... 255's. This will help us check that concurrent reading
1336 # doesn't duplicate or forget contents.
1337 N = 1000
1338 l = list(range(256)) * N
1339 random.shuffle(l)
1340 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001341 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001342 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001343 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001344 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001345 errors = []
1346 results = []
1347 def f():
1348 try:
1349 # Intra-buffer read then buffer-flushing read
1350 for n in cycle([1, 19]):
1351 s = bufio.read(n)
1352 if not s:
1353 break
1354 # list.append() is atomic
1355 results.append(s)
1356 except Exception as e:
1357 errors.append(e)
1358 raise
1359 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001360 with support.start_threads(threads):
1361 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001362 self.assertFalse(errors,
1363 "the following exceptions were caught: %r" % errors)
1364 s = b''.join(results)
1365 for i in range(256):
1366 c = bytes(bytearray([i]))
1367 self.assertEqual(s.count(c), N)
1368 finally:
1369 support.unlink(support.TESTFN)
1370
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001371 def test_unseekable(self):
1372 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1373 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1374 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1375 bufio.read(1)
1376 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1377 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1378
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001379 def test_misbehaved_io(self):
1380 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1381 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001382 self.assertRaises(OSError, bufio.seek, 0)
1383 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001384
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001385 def test_no_extraneous_read(self):
1386 # Issue #9550; when the raw IO object has satisfied the read request,
1387 # we should not issue any additional reads, otherwise it may block
1388 # (e.g. socket).
1389 bufsize = 16
1390 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1391 rawio = self.MockRawIO([b"x" * n])
1392 bufio = self.tp(rawio, bufsize)
1393 self.assertEqual(bufio.read(n), b"x" * n)
1394 # Simple case: one raw read is enough to satisfy the request.
1395 self.assertEqual(rawio._extraneous_reads, 0,
1396 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1397 # A more complex case where two raw reads are needed to satisfy
1398 # the request.
1399 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1400 bufio = self.tp(rawio, bufsize)
1401 self.assertEqual(bufio.read(n), b"x" * n)
1402 self.assertEqual(rawio._extraneous_reads, 0,
1403 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1404
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001405 def test_read_on_closed(self):
1406 # Issue #23796
1407 b = io.BufferedReader(io.BytesIO(b"12"))
1408 b.read(1)
1409 b.close()
1410 self.assertRaises(ValueError, b.peek)
1411 self.assertRaises(ValueError, b.read1, 1)
1412
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001413
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001414class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001415 tp = io.BufferedReader
1416
1417 def test_constructor(self):
1418 BufferedReaderTest.test_constructor(self)
1419 # The allocation can succeed on 32-bit builds, e.g. with more
1420 # than 2GB RAM and a 64-bit kernel.
1421 if sys.maxsize > 0x7FFFFFFF:
1422 rawio = self.MockRawIO()
1423 bufio = self.tp(rawio)
1424 self.assertRaises((OverflowError, MemoryError, ValueError),
1425 bufio.__init__, rawio, sys.maxsize)
1426
1427 def test_initialization(self):
1428 rawio = self.MockRawIO([b"abc"])
1429 bufio = self.tp(rawio)
1430 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1431 self.assertRaises(ValueError, bufio.read)
1432 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1433 self.assertRaises(ValueError, bufio.read)
1434 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1435 self.assertRaises(ValueError, bufio.read)
1436
1437 def test_misbehaved_io_read(self):
1438 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1439 bufio = self.tp(rawio)
1440 # _pyio.BufferedReader seems to implement reading different, so that
1441 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001442 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001443
1444 def test_garbage_collection(self):
1445 # C BufferedReader objects are collected.
1446 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001447 with support.check_warnings(('', ResourceWarning)):
1448 rawio = self.FileIO(support.TESTFN, "w+b")
1449 f = self.tp(rawio)
1450 f.f = f
1451 wr = weakref.ref(f)
1452 del f
1453 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001454 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001455
R David Murray67bfe802013-02-23 21:51:05 -05001456 def test_args_error(self):
1457 # Issue #17275
1458 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1459 self.tp(io.BytesIO(), 1024, 1024, 1024)
1460
1461
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001462class PyBufferedReaderTest(BufferedReaderTest):
1463 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001464
Guido van Rossuma9e20242007-03-08 00:43:48 +00001465
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001466class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1467 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001468
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001469 def test_constructor(self):
1470 rawio = self.MockRawIO()
1471 bufio = self.tp(rawio)
1472 bufio.__init__(rawio)
1473 bufio.__init__(rawio, buffer_size=1024)
1474 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001475 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001476 bufio.flush()
1477 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1478 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1479 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1480 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001481 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001482 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001483 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001484
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001485 def test_uninitialized(self):
1486 bufio = self.tp.__new__(self.tp)
1487 del bufio
1488 bufio = self.tp.__new__(self.tp)
1489 self.assertRaisesRegex((ValueError, AttributeError),
1490 'uninitialized|has no attribute',
1491 bufio.write, b'')
1492 bufio.__init__(self.MockRawIO())
1493 self.assertEqual(bufio.write(b''), 0)
1494
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001495 def test_detach_flush(self):
1496 raw = self.MockRawIO()
1497 buf = self.tp(raw)
1498 buf.write(b"howdy!")
1499 self.assertFalse(raw._write_stack)
1500 buf.detach()
1501 self.assertEqual(raw._write_stack, [b"howdy!"])
1502
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001503 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001504 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001505 writer = self.MockRawIO()
1506 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001507 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001508 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001509 buffer = bytearray(b"def")
1510 bufio.write(buffer)
1511 buffer[:] = b"***" # Overwrite our copy of the data
1512 bufio.flush()
1513 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001514
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001515 def test_write_overflow(self):
1516 writer = self.MockRawIO()
1517 bufio = self.tp(writer, 8)
1518 contents = b"abcdefghijklmnop"
1519 for n in range(0, len(contents), 3):
1520 bufio.write(contents[n:n+3])
1521 flushed = b"".join(writer._write_stack)
1522 # At least (total - 8) bytes were implicitly flushed, perhaps more
1523 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001524 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001525
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001526 def check_writes(self, intermediate_func):
1527 # Lots of writes, test the flushed output is as expected.
1528 contents = bytes(range(256)) * 1000
1529 n = 0
1530 writer = self.MockRawIO()
1531 bufio = self.tp(writer, 13)
1532 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1533 def gen_sizes():
1534 for size in count(1):
1535 for i in range(15):
1536 yield size
1537 sizes = gen_sizes()
1538 while n < len(contents):
1539 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001540 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001541 intermediate_func(bufio)
1542 n += size
1543 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001544 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001545
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001546 def test_writes(self):
1547 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001548
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001549 def test_writes_and_flushes(self):
1550 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001551
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001552 def test_writes_and_seeks(self):
1553 def _seekabs(bufio):
1554 pos = bufio.tell()
1555 bufio.seek(pos + 1, 0)
1556 bufio.seek(pos - 1, 0)
1557 bufio.seek(pos, 0)
1558 self.check_writes(_seekabs)
1559 def _seekrel(bufio):
1560 pos = bufio.seek(0, 1)
1561 bufio.seek(+1, 1)
1562 bufio.seek(-1, 1)
1563 bufio.seek(pos, 0)
1564 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001565
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001566 def test_writes_and_truncates(self):
1567 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001568
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001569 def test_write_non_blocking(self):
1570 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001571 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001572
Ezio Melottib3aedd42010-11-20 19:04:17 +00001573 self.assertEqual(bufio.write(b"abcd"), 4)
1574 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001575 # 1 byte will be written, the rest will be buffered
1576 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001577 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001578
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001579 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1580 raw.block_on(b"0")
1581 try:
1582 bufio.write(b"opqrwxyz0123456789")
1583 except self.BlockingIOError as e:
1584 written = e.characters_written
1585 else:
1586 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001587 self.assertEqual(written, 16)
1588 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001589 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001590
Ezio Melottib3aedd42010-11-20 19:04:17 +00001591 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001592 s = raw.pop_written()
1593 # Previously buffered bytes were flushed
1594 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001595
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001596 def test_write_and_rewind(self):
1597 raw = io.BytesIO()
1598 bufio = self.tp(raw, 4)
1599 self.assertEqual(bufio.write(b"abcdef"), 6)
1600 self.assertEqual(bufio.tell(), 6)
1601 bufio.seek(0, 0)
1602 self.assertEqual(bufio.write(b"XY"), 2)
1603 bufio.seek(6, 0)
1604 self.assertEqual(raw.getvalue(), b"XYcdef")
1605 self.assertEqual(bufio.write(b"123456"), 6)
1606 bufio.flush()
1607 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001608
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001609 def test_flush(self):
1610 writer = self.MockRawIO()
1611 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001612 bufio.write(b"abc")
1613 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001614 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001615
Antoine Pitrou131a4892012-10-16 22:57:11 +02001616 def test_writelines(self):
1617 l = [b'ab', b'cd', b'ef']
1618 writer = self.MockRawIO()
1619 bufio = self.tp(writer, 8)
1620 bufio.writelines(l)
1621 bufio.flush()
1622 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1623
1624 def test_writelines_userlist(self):
1625 l = UserList([b'ab', b'cd', b'ef'])
1626 writer = self.MockRawIO()
1627 bufio = self.tp(writer, 8)
1628 bufio.writelines(l)
1629 bufio.flush()
1630 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1631
1632 def test_writelines_error(self):
1633 writer = self.MockRawIO()
1634 bufio = self.tp(writer, 8)
1635 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1636 self.assertRaises(TypeError, bufio.writelines, None)
1637 self.assertRaises(TypeError, bufio.writelines, 'abc')
1638
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001639 def test_destructor(self):
1640 writer = self.MockRawIO()
1641 bufio = self.tp(writer, 8)
1642 bufio.write(b"abc")
1643 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001644 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001645 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001646
1647 def test_truncate(self):
1648 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001649 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001650 bufio = self.tp(raw, 8)
1651 bufio.write(b"abcdef")
1652 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001653 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001654 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001655 self.assertEqual(f.read(), b"abc")
1656
Victor Stinner45df8202010-04-28 22:31:17 +00001657 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001658 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001659 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001660 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001661 # Write out many bytes from many threads and test they were
1662 # all flushed.
1663 N = 1000
1664 contents = bytes(range(256)) * N
1665 sizes = cycle([1, 19])
1666 n = 0
1667 queue = deque()
1668 while n < len(contents):
1669 size = next(sizes)
1670 queue.append(contents[n:n+size])
1671 n += size
1672 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001673 # We use a real file object because it allows us to
1674 # exercise situations where the GIL is released before
1675 # writing the buffer to the raw streams. This is in addition
1676 # to concurrency issues due to switching threads in the middle
1677 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001678 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001679 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001680 errors = []
1681 def f():
1682 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001683 while True:
1684 try:
1685 s = queue.popleft()
1686 except IndexError:
1687 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001688 bufio.write(s)
1689 except Exception as e:
1690 errors.append(e)
1691 raise
1692 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001693 with support.start_threads(threads):
1694 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001695 self.assertFalse(errors,
1696 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001697 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001698 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001699 s = f.read()
1700 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001701 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001702 finally:
1703 support.unlink(support.TESTFN)
1704
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001705 def test_misbehaved_io(self):
1706 rawio = self.MisbehavedRawIO()
1707 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001708 self.assertRaises(OSError, bufio.seek, 0)
1709 self.assertRaises(OSError, bufio.tell)
1710 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001711
Florent Xicluna109d5732012-07-07 17:03:22 +02001712 def test_max_buffer_size_removal(self):
1713 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001714 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001715
Benjamin Peterson68623612012-12-20 11:53:11 -06001716 def test_write_error_on_close(self):
1717 raw = self.MockRawIO()
1718 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001719 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001720 raw.write = bad_write
1721 b = self.tp(raw)
1722 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001723 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001724 self.assertTrue(b.closed)
1725
Benjamin Peterson59406a92009-03-26 17:10:29 +00001726
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001727class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001728 tp = io.BufferedWriter
1729
1730 def test_constructor(self):
1731 BufferedWriterTest.test_constructor(self)
1732 # The allocation can succeed on 32-bit builds, e.g. with more
1733 # than 2GB RAM and a 64-bit kernel.
1734 if sys.maxsize > 0x7FFFFFFF:
1735 rawio = self.MockRawIO()
1736 bufio = self.tp(rawio)
1737 self.assertRaises((OverflowError, MemoryError, ValueError),
1738 bufio.__init__, rawio, sys.maxsize)
1739
1740 def test_initialization(self):
1741 rawio = self.MockRawIO()
1742 bufio = self.tp(rawio)
1743 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1744 self.assertRaises(ValueError, bufio.write, b"def")
1745 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1746 self.assertRaises(ValueError, bufio.write, b"def")
1747 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1748 self.assertRaises(ValueError, bufio.write, b"def")
1749
1750 def test_garbage_collection(self):
1751 # C BufferedWriter objects are collected, and collecting them flushes
1752 # all data to disk.
1753 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001754 with support.check_warnings(('', ResourceWarning)):
1755 rawio = self.FileIO(support.TESTFN, "w+b")
1756 f = self.tp(rawio)
1757 f.write(b"123xxx")
1758 f.x = f
1759 wr = weakref.ref(f)
1760 del f
1761 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001762 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001763 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001764 self.assertEqual(f.read(), b"123xxx")
1765
R David Murray67bfe802013-02-23 21:51:05 -05001766 def test_args_error(self):
1767 # Issue #17275
1768 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1769 self.tp(io.BytesIO(), 1024, 1024, 1024)
1770
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001771
1772class PyBufferedWriterTest(BufferedWriterTest):
1773 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001774
Guido van Rossum01a27522007-03-07 01:00:12 +00001775class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001776
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001777 def test_constructor(self):
1778 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001779 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001780
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001781 def test_uninitialized(self):
1782 pair = self.tp.__new__(self.tp)
1783 del pair
1784 pair = self.tp.__new__(self.tp)
1785 self.assertRaisesRegex((ValueError, AttributeError),
1786 'uninitialized|has no attribute',
1787 pair.read, 0)
1788 self.assertRaisesRegex((ValueError, AttributeError),
1789 'uninitialized|has no attribute',
1790 pair.write, b'')
1791 pair.__init__(self.MockRawIO(), self.MockRawIO())
1792 self.assertEqual(pair.read(0), b'')
1793 self.assertEqual(pair.write(b''), 0)
1794
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001795 def test_detach(self):
1796 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1797 self.assertRaises(self.UnsupportedOperation, pair.detach)
1798
Florent Xicluna109d5732012-07-07 17:03:22 +02001799 def test_constructor_max_buffer_size_removal(self):
1800 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001801 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001802
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001803 def test_constructor_with_not_readable(self):
1804 class NotReadable(MockRawIO):
1805 def readable(self):
1806 return False
1807
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001808 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001809
1810 def test_constructor_with_not_writeable(self):
1811 class NotWriteable(MockRawIO):
1812 def writable(self):
1813 return False
1814
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001815 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001816
1817 def test_read(self):
1818 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1819
1820 self.assertEqual(pair.read(3), b"abc")
1821 self.assertEqual(pair.read(1), b"d")
1822 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001823 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1824 self.assertEqual(pair.read(None), b"abc")
1825
1826 def test_readlines(self):
1827 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1828 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1829 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1830 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001831
1832 def test_read1(self):
1833 # .read1() is delegated to the underlying reader object, so this test
1834 # can be shallow.
1835 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1836
1837 self.assertEqual(pair.read1(3), b"abc")
1838
1839 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00001840 for method in ("readinto", "readinto1"):
1841 with self.subTest(method):
1842 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001843
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03001844 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00001845 self.assertEqual(getattr(pair, method)(data), 5)
1846 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001847
1848 def test_write(self):
1849 w = self.MockRawIO()
1850 pair = self.tp(self.MockRawIO(), w)
1851
1852 pair.write(b"abc")
1853 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00001854 buffer = bytearray(b"def")
1855 pair.write(buffer)
1856 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001857 pair.flush()
1858 self.assertEqual(w._write_stack, [b"abc", b"def"])
1859
1860 def test_peek(self):
1861 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1862
1863 self.assertTrue(pair.peek(3).startswith(b"abc"))
1864 self.assertEqual(pair.read(3), b"abc")
1865
1866 def test_readable(self):
1867 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1868 self.assertTrue(pair.readable())
1869
1870 def test_writeable(self):
1871 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1872 self.assertTrue(pair.writable())
1873
1874 def test_seekable(self):
1875 # BufferedRWPairs are never seekable, even if their readers and writers
1876 # are.
1877 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1878 self.assertFalse(pair.seekable())
1879
1880 # .flush() is delegated to the underlying writer object and has been
1881 # tested in the test_write method.
1882
1883 def test_close_and_closed(self):
1884 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1885 self.assertFalse(pair.closed)
1886 pair.close()
1887 self.assertTrue(pair.closed)
1888
Serhiy Storchaka7665be62015-03-24 23:21:57 +02001889 def test_reader_close_error_on_close(self):
1890 def reader_close():
1891 reader_non_existing
1892 reader = self.MockRawIO()
1893 reader.close = reader_close
1894 writer = self.MockRawIO()
1895 pair = self.tp(reader, writer)
1896 with self.assertRaises(NameError) as err:
1897 pair.close()
1898 self.assertIn('reader_non_existing', str(err.exception))
1899 self.assertTrue(pair.closed)
1900 self.assertFalse(reader.closed)
1901 self.assertTrue(writer.closed)
1902
1903 def test_writer_close_error_on_close(self):
1904 def writer_close():
1905 writer_non_existing
1906 reader = self.MockRawIO()
1907 writer = self.MockRawIO()
1908 writer.close = writer_close
1909 pair = self.tp(reader, writer)
1910 with self.assertRaises(NameError) as err:
1911 pair.close()
1912 self.assertIn('writer_non_existing', str(err.exception))
1913 self.assertFalse(pair.closed)
1914 self.assertTrue(reader.closed)
1915 self.assertFalse(writer.closed)
1916
1917 def test_reader_writer_close_error_on_close(self):
1918 def reader_close():
1919 reader_non_existing
1920 def writer_close():
1921 writer_non_existing
1922 reader = self.MockRawIO()
1923 reader.close = reader_close
1924 writer = self.MockRawIO()
1925 writer.close = writer_close
1926 pair = self.tp(reader, writer)
1927 with self.assertRaises(NameError) as err:
1928 pair.close()
1929 self.assertIn('reader_non_existing', str(err.exception))
1930 self.assertIsInstance(err.exception.__context__, NameError)
1931 self.assertIn('writer_non_existing', str(err.exception.__context__))
1932 self.assertFalse(pair.closed)
1933 self.assertFalse(reader.closed)
1934 self.assertFalse(writer.closed)
1935
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001936 def test_isatty(self):
1937 class SelectableIsAtty(MockRawIO):
1938 def __init__(self, isatty):
1939 MockRawIO.__init__(self)
1940 self._isatty = isatty
1941
1942 def isatty(self):
1943 return self._isatty
1944
1945 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1946 self.assertFalse(pair.isatty())
1947
1948 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1949 self.assertTrue(pair.isatty())
1950
1951 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1952 self.assertTrue(pair.isatty())
1953
1954 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1955 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001956
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04001957 def test_weakref_clearing(self):
1958 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1959 ref = weakref.ref(brw)
1960 brw = None
1961 ref = None # Shouldn't segfault.
1962
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001963class CBufferedRWPairTest(BufferedRWPairTest):
1964 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001965
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001966class PyBufferedRWPairTest(BufferedRWPairTest):
1967 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001968
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001969
1970class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1971 read_mode = "rb+"
1972 write_mode = "wb+"
1973
1974 def test_constructor(self):
1975 BufferedReaderTest.test_constructor(self)
1976 BufferedWriterTest.test_constructor(self)
1977
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001978 def test_uninitialized(self):
1979 BufferedReaderTest.test_uninitialized(self)
1980 BufferedWriterTest.test_uninitialized(self)
1981
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001982 def test_read_and_write(self):
1983 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001984 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001985
1986 self.assertEqual(b"as", rw.read(2))
1987 rw.write(b"ddd")
1988 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001989 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001990 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001991 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001992
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001993 def test_seek_and_tell(self):
1994 raw = self.BytesIO(b"asdfghjkl")
1995 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001996
Ezio Melottib3aedd42010-11-20 19:04:17 +00001997 self.assertEqual(b"as", rw.read(2))
1998 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001999 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002000 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00002001
Antoine Pitroue05565e2011-08-20 14:39:23 +02002002 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00002003 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02002004 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002005 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002006 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002007 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002008 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002009 self.assertEqual(7, rw.tell())
2010 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02002011 rw.flush()
2012 self.assertEqual(b"asdf123fl", raw.getvalue())
2013
Christian Heimes8e42a0a2007-11-08 18:04:45 +00002014 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002015
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002016 def check_flush_and_read(self, read_func):
2017 raw = self.BytesIO(b"abcdefghi")
2018 bufio = self.tp(raw)
2019
Ezio Melottib3aedd42010-11-20 19:04:17 +00002020 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002021 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002022 self.assertEqual(b"ef", read_func(bufio, 2))
2023 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002024 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002025 self.assertEqual(6, bufio.tell())
2026 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002027 raw.seek(0, 0)
2028 raw.write(b"XYZ")
2029 # flush() resets the read buffer
2030 bufio.flush()
2031 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002032 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002033
2034 def test_flush_and_read(self):
2035 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2036
2037 def test_flush_and_readinto(self):
2038 def _readinto(bufio, n=-1):
2039 b = bytearray(n if n >= 0 else 9999)
2040 n = bufio.readinto(b)
2041 return bytes(b[:n])
2042 self.check_flush_and_read(_readinto)
2043
2044 def test_flush_and_peek(self):
2045 def _peek(bufio, n=-1):
2046 # This relies on the fact that the buffer can contain the whole
2047 # raw stream, otherwise peek() can return less.
2048 b = bufio.peek(n)
2049 if n != -1:
2050 b = b[:n]
2051 bufio.seek(len(b), 1)
2052 return b
2053 self.check_flush_and_read(_peek)
2054
2055 def test_flush_and_write(self):
2056 raw = self.BytesIO(b"abcdefghi")
2057 bufio = self.tp(raw)
2058
2059 bufio.write(b"123")
2060 bufio.flush()
2061 bufio.write(b"45")
2062 bufio.flush()
2063 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002064 self.assertEqual(b"12345fghi", raw.getvalue())
2065 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002066
2067 def test_threads(self):
2068 BufferedReaderTest.test_threads(self)
2069 BufferedWriterTest.test_threads(self)
2070
2071 def test_writes_and_peek(self):
2072 def _peek(bufio):
2073 bufio.peek(1)
2074 self.check_writes(_peek)
2075 def _peek(bufio):
2076 pos = bufio.tell()
2077 bufio.seek(-1, 1)
2078 bufio.peek(1)
2079 bufio.seek(pos, 0)
2080 self.check_writes(_peek)
2081
2082 def test_writes_and_reads(self):
2083 def _read(bufio):
2084 bufio.seek(-1, 1)
2085 bufio.read(1)
2086 self.check_writes(_read)
2087
2088 def test_writes_and_read1s(self):
2089 def _read1(bufio):
2090 bufio.seek(-1, 1)
2091 bufio.read1(1)
2092 self.check_writes(_read1)
2093
2094 def test_writes_and_readintos(self):
2095 def _read(bufio):
2096 bufio.seek(-1, 1)
2097 bufio.readinto(bytearray(1))
2098 self.check_writes(_read)
2099
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002100 def test_write_after_readahead(self):
2101 # Issue #6629: writing after the buffer was filled by readahead should
2102 # first rewind the raw stream.
2103 for overwrite_size in [1, 5]:
2104 raw = self.BytesIO(b"A" * 10)
2105 bufio = self.tp(raw, 4)
2106 # Trigger readahead
2107 self.assertEqual(bufio.read(1), b"A")
2108 self.assertEqual(bufio.tell(), 1)
2109 # Overwriting should rewind the raw stream if it needs so
2110 bufio.write(b"B" * overwrite_size)
2111 self.assertEqual(bufio.tell(), overwrite_size + 1)
2112 # If the write size was smaller than the buffer size, flush() and
2113 # check that rewind happens.
2114 bufio.flush()
2115 self.assertEqual(bufio.tell(), overwrite_size + 1)
2116 s = raw.getvalue()
2117 self.assertEqual(s,
2118 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2119
Antoine Pitrou7c404892011-05-13 00:13:33 +02002120 def test_write_rewind_write(self):
2121 # Various combinations of reading / writing / seeking backwards / writing again
2122 def mutate(bufio, pos1, pos2):
2123 assert pos2 >= pos1
2124 # Fill the buffer
2125 bufio.seek(pos1)
2126 bufio.read(pos2 - pos1)
2127 bufio.write(b'\x02')
2128 # This writes earlier than the previous write, but still inside
2129 # the buffer.
2130 bufio.seek(pos1)
2131 bufio.write(b'\x01')
2132
2133 b = b"\x80\x81\x82\x83\x84"
2134 for i in range(0, len(b)):
2135 for j in range(i, len(b)):
2136 raw = self.BytesIO(b)
2137 bufio = self.tp(raw, 100)
2138 mutate(bufio, i, j)
2139 bufio.flush()
2140 expected = bytearray(b)
2141 expected[j] = 2
2142 expected[i] = 1
2143 self.assertEqual(raw.getvalue(), expected,
2144 "failed result for i=%d, j=%d" % (i, j))
2145
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002146 def test_truncate_after_read_or_write(self):
2147 raw = self.BytesIO(b"A" * 10)
2148 bufio = self.tp(raw, 100)
2149 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2150 self.assertEqual(bufio.truncate(), 2)
2151 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2152 self.assertEqual(bufio.truncate(), 4)
2153
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002154 def test_misbehaved_io(self):
2155 BufferedReaderTest.test_misbehaved_io(self)
2156 BufferedWriterTest.test_misbehaved_io(self)
2157
Antoine Pitroue05565e2011-08-20 14:39:23 +02002158 def test_interleaved_read_write(self):
2159 # Test for issue #12213
2160 with self.BytesIO(b'abcdefgh') as raw:
2161 with self.tp(raw, 100) as f:
2162 f.write(b"1")
2163 self.assertEqual(f.read(1), b'b')
2164 f.write(b'2')
2165 self.assertEqual(f.read1(1), b'd')
2166 f.write(b'3')
2167 buf = bytearray(1)
2168 f.readinto(buf)
2169 self.assertEqual(buf, b'f')
2170 f.write(b'4')
2171 self.assertEqual(f.peek(1), b'h')
2172 f.flush()
2173 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2174
2175 with self.BytesIO(b'abc') as raw:
2176 with self.tp(raw, 100) as f:
2177 self.assertEqual(f.read(1), b'a')
2178 f.write(b"2")
2179 self.assertEqual(f.read(1), b'c')
2180 f.flush()
2181 self.assertEqual(raw.getvalue(), b'a2c')
2182
2183 def test_interleaved_readline_write(self):
2184 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2185 with self.tp(raw) as f:
2186 f.write(b'1')
2187 self.assertEqual(f.readline(), b'b\n')
2188 f.write(b'2')
2189 self.assertEqual(f.readline(), b'def\n')
2190 f.write(b'3')
2191 self.assertEqual(f.readline(), b'\n')
2192 f.flush()
2193 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2194
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002195 # You can't construct a BufferedRandom over a non-seekable stream.
2196 test_unseekable = None
2197
R David Murray67bfe802013-02-23 21:51:05 -05002198
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002199class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002200 tp = io.BufferedRandom
2201
2202 def test_constructor(self):
2203 BufferedRandomTest.test_constructor(self)
2204 # The allocation can succeed on 32-bit builds, e.g. with more
2205 # than 2GB RAM and a 64-bit kernel.
2206 if sys.maxsize > 0x7FFFFFFF:
2207 rawio = self.MockRawIO()
2208 bufio = self.tp(rawio)
2209 self.assertRaises((OverflowError, MemoryError, ValueError),
2210 bufio.__init__, rawio, sys.maxsize)
2211
2212 def test_garbage_collection(self):
2213 CBufferedReaderTest.test_garbage_collection(self)
2214 CBufferedWriterTest.test_garbage_collection(self)
2215
R David Murray67bfe802013-02-23 21:51:05 -05002216 def test_args_error(self):
2217 # Issue #17275
2218 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2219 self.tp(io.BytesIO(), 1024, 1024, 1024)
2220
2221
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002222class PyBufferedRandomTest(BufferedRandomTest):
2223 tp = pyio.BufferedRandom
2224
2225
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002226# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2227# properties:
2228# - A single output character can correspond to many bytes of input.
2229# - The number of input bytes to complete the character can be
2230# undetermined until the last input byte is received.
2231# - The number of input bytes can vary depending on previous input.
2232# - A single input byte can correspond to many characters of output.
2233# - The number of output characters can be undetermined until the
2234# last input byte is received.
2235# - The number of output characters can vary depending on previous input.
2236
2237class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2238 """
2239 For testing seek/tell behavior with a stateful, buffering decoder.
2240
2241 Input is a sequence of words. Words may be fixed-length (length set
2242 by input) or variable-length (period-terminated). In variable-length
2243 mode, extra periods are ignored. Possible words are:
2244 - 'i' followed by a number sets the input length, I (maximum 99).
2245 When I is set to 0, words are space-terminated.
2246 - 'o' followed by a number sets the output length, O (maximum 99).
2247 - Any other word is converted into a word followed by a period on
2248 the output. The output word consists of the input word truncated
2249 or padded out with hyphens to make its length equal to O. If O
2250 is 0, the word is output verbatim without truncating or padding.
2251 I and O are initially set to 1. When I changes, any buffered input is
2252 re-scanned according to the new I. EOF also terminates the last word.
2253 """
2254
2255 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002256 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002257 self.reset()
2258
2259 def __repr__(self):
2260 return '<SID %x>' % id(self)
2261
2262 def reset(self):
2263 self.i = 1
2264 self.o = 1
2265 self.buffer = bytearray()
2266
2267 def getstate(self):
2268 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2269 return bytes(self.buffer), i*100 + o
2270
2271 def setstate(self, state):
2272 buffer, io = state
2273 self.buffer = bytearray(buffer)
2274 i, o = divmod(io, 100)
2275 self.i, self.o = i ^ 1, o ^ 1
2276
2277 def decode(self, input, final=False):
2278 output = ''
2279 for b in input:
2280 if self.i == 0: # variable-length, terminated with period
2281 if b == ord('.'):
2282 if self.buffer:
2283 output += self.process_word()
2284 else:
2285 self.buffer.append(b)
2286 else: # fixed-length, terminate after self.i bytes
2287 self.buffer.append(b)
2288 if len(self.buffer) == self.i:
2289 output += self.process_word()
2290 if final and self.buffer: # EOF terminates the last word
2291 output += self.process_word()
2292 return output
2293
2294 def process_word(self):
2295 output = ''
2296 if self.buffer[0] == ord('i'):
2297 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2298 elif self.buffer[0] == ord('o'):
2299 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2300 else:
2301 output = self.buffer.decode('ascii')
2302 if len(output) < self.o:
2303 output += '-'*self.o # pad out with hyphens
2304 if self.o:
2305 output = output[:self.o] # truncate to output length
2306 output += '.'
2307 self.buffer = bytearray()
2308 return output
2309
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002310 codecEnabled = False
2311
2312 @classmethod
2313 def lookupTestDecoder(cls, name):
2314 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002315 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002316 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002317 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002318 incrementalencoder=None,
2319 streamreader=None, streamwriter=None,
2320 incrementaldecoder=cls)
2321
2322# Register the previous decoder for testing.
2323# Disabled by default, tests will enable it.
2324codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2325
2326
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002327class StatefulIncrementalDecoderTest(unittest.TestCase):
2328 """
2329 Make sure the StatefulIncrementalDecoder actually works.
2330 """
2331
2332 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002333 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002334 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002335 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002336 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002337 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002338 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002339 # I=0, O=6 (variable-length input, fixed-length output)
2340 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2341 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002342 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002343 # I=6, O=3 (fixed-length input > fixed-length output)
2344 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2345 # I=0, then 3; O=29, then 15 (with longer output)
2346 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2347 'a----------------------------.' +
2348 'b----------------------------.' +
2349 'cde--------------------------.' +
2350 'abcdefghijabcde.' +
2351 'a.b------------.' +
2352 '.c.------------.' +
2353 'd.e------------.' +
2354 'k--------------.' +
2355 'l--------------.' +
2356 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002357 ]
2358
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002359 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002360 # Try a few one-shot test cases.
2361 for input, eof, output in self.test_cases:
2362 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002363 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002364
2365 # Also test an unfinished decode, followed by forcing EOF.
2366 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002367 self.assertEqual(d.decode(b'oiabcd'), '')
2368 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002369
2370class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002371
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002372 def setUp(self):
2373 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2374 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002375 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002376
Guido van Rossumd0712812007-04-11 16:32:43 +00002377 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002378 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002379
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002380 def test_constructor(self):
2381 r = self.BytesIO(b"\xc3\xa9\n\n")
2382 b = self.BufferedReader(r, 1000)
2383 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002384 t.__init__(b, encoding="latin-1", newline="\r\n")
2385 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002386 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002387 t.__init__(b, encoding="utf-8", line_buffering=True)
2388 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002389 self.assertEqual(t.line_buffering, True)
2390 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002391 self.assertRaises(TypeError, t.__init__, b, newline=42)
2392 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2393
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002394 def test_uninitialized(self):
2395 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2396 del t
2397 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2398 self.assertRaises(Exception, repr, t)
2399 self.assertRaisesRegex((ValueError, AttributeError),
2400 'uninitialized|has no attribute',
2401 t.read, 0)
2402 t.__init__(self.MockRawIO())
2403 self.assertEqual(t.read(0), '')
2404
Nick Coghlana9b15242014-02-04 22:11:18 +10002405 def test_non_text_encoding_codecs_are_rejected(self):
2406 # Ensure the constructor complains if passed a codec that isn't
2407 # marked as a text encoding
2408 # http://bugs.python.org/issue20404
2409 r = self.BytesIO()
2410 b = self.BufferedWriter(r)
2411 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2412 self.TextIOWrapper(b, encoding="hex")
2413
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002414 def test_detach(self):
2415 r = self.BytesIO()
2416 b = self.BufferedWriter(r)
2417 t = self.TextIOWrapper(b)
2418 self.assertIs(t.detach(), b)
2419
2420 t = self.TextIOWrapper(b, encoding="ascii")
2421 t.write("howdy")
2422 self.assertFalse(r.getvalue())
2423 t.detach()
2424 self.assertEqual(r.getvalue(), b"howdy")
2425 self.assertRaises(ValueError, t.detach)
2426
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002427 # Operations independent of the detached stream should still work
2428 repr(t)
2429 self.assertEqual(t.encoding, "ascii")
2430 self.assertEqual(t.errors, "strict")
2431 self.assertFalse(t.line_buffering)
2432
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002433 def test_repr(self):
2434 raw = self.BytesIO("hello".encode("utf-8"))
2435 b = self.BufferedReader(raw)
2436 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002437 modname = self.TextIOWrapper.__module__
2438 self.assertEqual(repr(t),
2439 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2440 raw.name = "dummy"
2441 self.assertEqual(repr(t),
2442 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002443 t.mode = "r"
2444 self.assertEqual(repr(t),
2445 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002446 raw.name = b"dummy"
2447 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002448 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002449
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002450 t.buffer.detach()
2451 repr(t) # Should not raise an exception
2452
Serhiy Storchakafca705d2017-03-19 20:27:16 +02002453 def test_recursive_repr(self):
2454 # Issue #25455
2455 raw = self.BytesIO()
2456 t = self.TextIOWrapper(raw)
2457 with support.swap_attr(raw, 'name', t):
2458 try:
2459 repr(t) # Should not crash
2460 except RuntimeError:
2461 pass
2462
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002463 def test_line_buffering(self):
2464 r = self.BytesIO()
2465 b = self.BufferedWriter(r, 1000)
2466 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002467 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002468 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002469 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002470 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002471 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002472 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002473
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002474 def test_default_encoding(self):
2475 old_environ = dict(os.environ)
2476 try:
2477 # try to get a user preferred encoding different than the current
2478 # locale encoding to check that TextIOWrapper() uses the current
2479 # locale encoding and not the user preferred encoding
2480 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2481 if key in os.environ:
2482 del os.environ[key]
2483
2484 current_locale_encoding = locale.getpreferredencoding(False)
2485 b = self.BytesIO()
2486 t = self.TextIOWrapper(b)
2487 self.assertEqual(t.encoding, current_locale_encoding)
2488 finally:
2489 os.environ.clear()
2490 os.environ.update(old_environ)
2491
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002492 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002493 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002494 # Issue 15989
2495 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002496 b = self.BytesIO()
2497 b.fileno = lambda: _testcapi.INT_MAX + 1
2498 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2499 b.fileno = lambda: _testcapi.UINT_MAX + 1
2500 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2501
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002502 def test_encoding(self):
2503 # Check the encoding attribute is always set, and valid
2504 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002505 t = self.TextIOWrapper(b, encoding="utf-8")
2506 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002507 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002508 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002509 codecs.lookup(t.encoding)
2510
2511 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002512 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002513 b = self.BytesIO(b"abc\n\xff\n")
2514 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002515 self.assertRaises(UnicodeError, t.read)
2516 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002517 b = self.BytesIO(b"abc\n\xff\n")
2518 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002519 self.assertRaises(UnicodeError, t.read)
2520 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002521 b = self.BytesIO(b"abc\n\xff\n")
2522 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002523 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002524 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002525 b = self.BytesIO(b"abc\n\xff\n")
2526 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002527 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002528
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002529 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002530 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002531 b = self.BytesIO()
2532 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002533 self.assertRaises(UnicodeError, t.write, "\xff")
2534 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002535 b = self.BytesIO()
2536 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002537 self.assertRaises(UnicodeError, t.write, "\xff")
2538 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002539 b = self.BytesIO()
2540 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002541 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002542 t.write("abc\xffdef\n")
2543 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002544 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002545 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002546 b = self.BytesIO()
2547 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002548 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002549 t.write("abc\xffdef\n")
2550 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002551 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002552
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002553 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002554 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2555
2556 tests = [
2557 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002558 [ '', input_lines ],
2559 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2560 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2561 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002562 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002563 encodings = (
2564 'utf-8', 'latin-1',
2565 'utf-16', 'utf-16-le', 'utf-16-be',
2566 'utf-32', 'utf-32-le', 'utf-32-be',
2567 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002568
Guido van Rossum8358db22007-08-18 21:39:55 +00002569 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002570 # character in TextIOWrapper._pending_line.
2571 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002572 # XXX: str.encode() should return bytes
2573 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002574 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002575 for bufsize in range(1, 10):
2576 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002577 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2578 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002579 encoding=encoding)
2580 if do_reads:
2581 got_lines = []
2582 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002583 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002584 if c2 == '':
2585 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002586 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002587 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002588 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002589 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002590
2591 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002592 self.assertEqual(got_line, exp_line)
2593 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002594
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002595 def test_newlines_input(self):
2596 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002597 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2598 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002599 (None, normalized.decode("ascii").splitlines(keepends=True)),
2600 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002601 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2602 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2603 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002604 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002605 buf = self.BytesIO(testdata)
2606 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002607 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002608 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002609 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002610
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002611 def test_newlines_output(self):
2612 testdict = {
2613 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2614 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2615 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2616 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2617 }
2618 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2619 for newline, expected in tests:
2620 buf = self.BytesIO()
2621 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2622 txt.write("AAA\nB")
2623 txt.write("BB\nCCC\n")
2624 txt.write("X\rY\r\nZ")
2625 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002626 self.assertEqual(buf.closed, False)
2627 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002628
2629 def test_destructor(self):
2630 l = []
2631 base = self.BytesIO
2632 class MyBytesIO(base):
2633 def close(self):
2634 l.append(self.getvalue())
2635 base.close(self)
2636 b = MyBytesIO()
2637 t = self.TextIOWrapper(b, encoding="ascii")
2638 t.write("abc")
2639 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002640 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002641 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002642
2643 def test_override_destructor(self):
2644 record = []
2645 class MyTextIO(self.TextIOWrapper):
2646 def __del__(self):
2647 record.append(1)
2648 try:
2649 f = super().__del__
2650 except AttributeError:
2651 pass
2652 else:
2653 f()
2654 def close(self):
2655 record.append(2)
2656 super().close()
2657 def flush(self):
2658 record.append(3)
2659 super().flush()
2660 b = self.BytesIO()
2661 t = MyTextIO(b, encoding="ascii")
2662 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002663 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002664 self.assertEqual(record, [1, 2, 3])
2665
2666 def test_error_through_destructor(self):
2667 # Test that the exception state is not modified by a destructor,
2668 # even if close() fails.
2669 rawio = self.CloseFailureIO()
2670 def f():
2671 self.TextIOWrapper(rawio).xyzzy
2672 with support.captured_output("stderr") as s:
2673 self.assertRaises(AttributeError, f)
2674 s = s.getvalue().strip()
2675 if s:
2676 # The destructor *may* have printed an unraisable error, check it
2677 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002678 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002679 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002680
Guido van Rossum9b76da62007-04-11 01:09:03 +00002681 # Systematic tests of the text I/O API
2682
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002683 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002684 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002685 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002686 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002687 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002688 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002689 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002690 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002691 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002692 self.assertEqual(f.tell(), 0)
2693 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002694 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002695 self.assertEqual(f.seek(0), 0)
2696 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002697 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002698 self.assertEqual(f.read(2), "ab")
2699 self.assertEqual(f.read(1), "c")
2700 self.assertEqual(f.read(1), "")
2701 self.assertEqual(f.read(), "")
2702 self.assertEqual(f.tell(), cookie)
2703 self.assertEqual(f.seek(0), 0)
2704 self.assertEqual(f.seek(0, 2), cookie)
2705 self.assertEqual(f.write("def"), 3)
2706 self.assertEqual(f.seek(cookie), cookie)
2707 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002708 if enc.startswith("utf"):
2709 self.multi_line_test(f, enc)
2710 f.close()
2711
2712 def multi_line_test(self, f, enc):
2713 f.seek(0)
2714 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002715 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002716 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002717 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002718 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002719 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002720 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002721 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002722 wlines.append((f.tell(), line))
2723 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002724 f.seek(0)
2725 rlines = []
2726 while True:
2727 pos = f.tell()
2728 line = f.readline()
2729 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002730 break
2731 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002732 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002733
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002734 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002735 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002736 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002737 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002738 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002739 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002740 p2 = f.tell()
2741 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002742 self.assertEqual(f.tell(), p0)
2743 self.assertEqual(f.readline(), "\xff\n")
2744 self.assertEqual(f.tell(), p1)
2745 self.assertEqual(f.readline(), "\xff\n")
2746 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002747 f.seek(0)
2748 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002749 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002750 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002751 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002752 f.close()
2753
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002754 def test_seeking(self):
2755 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002756 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002757 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002758 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002759 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002760 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002761 suffix = bytes(u_suffix.encode("utf-8"))
2762 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002763 with self.open(support.TESTFN, "wb") as f:
2764 f.write(line*2)
2765 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2766 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002767 self.assertEqual(s, str(prefix, "ascii"))
2768 self.assertEqual(f.tell(), prefix_size)
2769 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002770
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002771 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002772 # Regression test for a specific bug
2773 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002774 with self.open(support.TESTFN, "wb") as f:
2775 f.write(data)
2776 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2777 f._CHUNK_SIZE # Just test that it exists
2778 f._CHUNK_SIZE = 2
2779 f.readline()
2780 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002781
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002782 def test_seek_and_tell(self):
2783 #Test seek/tell using the StatefulIncrementalDecoder.
2784 # Make test faster by doing smaller seeks
2785 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002786
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002787 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002788 """Tell/seek to various points within a data stream and ensure
2789 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002790 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002791 f.write(data)
2792 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002793 f = self.open(support.TESTFN, encoding='test_decoder')
2794 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002795 decoded = f.read()
2796 f.close()
2797
Neal Norwitze2b07052008-03-18 19:52:05 +00002798 for i in range(min_pos, len(decoded) + 1): # seek positions
2799 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002800 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002801 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002802 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002803 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002804 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002805 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002806 f.close()
2807
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002808 # Enable the test decoder.
2809 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002810
2811 # Run the tests.
2812 try:
2813 # Try each test case.
2814 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002815 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002816
2817 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002818 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2819 offset = CHUNK_SIZE - len(input)//2
2820 prefix = b'.'*offset
2821 # Don't bother seeking into the prefix (takes too long).
2822 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002823 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002824
2825 # Ensure our test decoder won't interfere with subsequent tests.
2826 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002827 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002828
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002829 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002830 data = "1234567890"
2831 tests = ("utf-16",
2832 "utf-16-le",
2833 "utf-16-be",
2834 "utf-32",
2835 "utf-32-le",
2836 "utf-32-be")
2837 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002838 buf = self.BytesIO()
2839 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002840 # Check if the BOM is written only once (see issue1753).
2841 f.write(data)
2842 f.write(data)
2843 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002844 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002845 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002846 self.assertEqual(f.read(), data * 2)
2847 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002848
Benjamin Petersona1b49012009-03-31 23:11:32 +00002849 def test_unreadable(self):
2850 class UnReadable(self.BytesIO):
2851 def readable(self):
2852 return False
2853 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002854 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002855
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002856 def test_read_one_by_one(self):
2857 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002858 reads = ""
2859 while True:
2860 c = txt.read(1)
2861 if not c:
2862 break
2863 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002864 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002865
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002866 def test_readlines(self):
2867 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2868 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2869 txt.seek(0)
2870 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2871 txt.seek(0)
2872 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2873
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002874 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002875 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002876 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002877 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002878 reads = ""
2879 while True:
2880 c = txt.read(128)
2881 if not c:
2882 break
2883 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002884 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002885
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002886 def test_writelines(self):
2887 l = ['ab', 'cd', 'ef']
2888 buf = self.BytesIO()
2889 txt = self.TextIOWrapper(buf)
2890 txt.writelines(l)
2891 txt.flush()
2892 self.assertEqual(buf.getvalue(), b'abcdef')
2893
2894 def test_writelines_userlist(self):
2895 l = UserList(['ab', 'cd', 'ef'])
2896 buf = self.BytesIO()
2897 txt = self.TextIOWrapper(buf)
2898 txt.writelines(l)
2899 txt.flush()
2900 self.assertEqual(buf.getvalue(), b'abcdef')
2901
2902 def test_writelines_error(self):
2903 txt = self.TextIOWrapper(self.BytesIO())
2904 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2905 self.assertRaises(TypeError, txt.writelines, None)
2906 self.assertRaises(TypeError, txt.writelines, b'abc')
2907
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002908 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002909 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002910
2911 # read one char at a time
2912 reads = ""
2913 while True:
2914 c = txt.read(1)
2915 if not c:
2916 break
2917 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002918 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002919
2920 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002921 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002922 txt._CHUNK_SIZE = 4
2923
2924 reads = ""
2925 while True:
2926 c = txt.read(4)
2927 if not c:
2928 break
2929 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002930 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002931
2932 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002933 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002934 txt._CHUNK_SIZE = 4
2935
2936 reads = txt.read(4)
2937 reads += txt.read(4)
2938 reads += txt.readline()
2939 reads += txt.readline()
2940 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002941 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002942
2943 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002944 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002945 txt._CHUNK_SIZE = 4
2946
2947 reads = txt.read(4)
2948 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002949 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002950
2951 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002952 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002953 txt._CHUNK_SIZE = 4
2954
2955 reads = txt.read(4)
2956 pos = txt.tell()
2957 txt.seek(0)
2958 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002959 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002960
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002961 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002962 buffer = self.BytesIO(self.testdata)
2963 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002964
2965 self.assertEqual(buffer.seekable(), txt.seekable())
2966
Antoine Pitroue4501852009-05-14 18:55:55 +00002967 def test_append_bom(self):
2968 # The BOM is not written again when appending to a non-empty file
2969 filename = support.TESTFN
2970 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2971 with self.open(filename, 'w', encoding=charset) as f:
2972 f.write('aaa')
2973 pos = f.tell()
2974 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002975 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002976
2977 with self.open(filename, 'a', encoding=charset) as f:
2978 f.write('xxx')
2979 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002980 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002981
2982 def test_seek_bom(self):
2983 # Same test, but when seeking manually
2984 filename = support.TESTFN
2985 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2986 with self.open(filename, 'w', encoding=charset) as f:
2987 f.write('aaa')
2988 pos = f.tell()
2989 with self.open(filename, 'r+', encoding=charset) as f:
2990 f.seek(pos)
2991 f.write('zzz')
2992 f.seek(0)
2993 f.write('bbb')
2994 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002995 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002996
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02002997 def test_seek_append_bom(self):
2998 # Same test, but first seek to the start and then to the end
2999 filename = support.TESTFN
3000 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3001 with self.open(filename, 'w', encoding=charset) as f:
3002 f.write('aaa')
3003 with self.open(filename, 'a', encoding=charset) as f:
3004 f.seek(0)
3005 f.seek(0, self.SEEK_END)
3006 f.write('xxx')
3007 with self.open(filename, 'rb') as f:
3008 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3009
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003010 def test_errors_property(self):
3011 with self.open(support.TESTFN, "w") as f:
3012 self.assertEqual(f.errors, "strict")
3013 with self.open(support.TESTFN, "w", errors="replace") as f:
3014 self.assertEqual(f.errors, "replace")
3015
Brett Cannon31f59292011-02-21 19:29:56 +00003016 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00003017 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003018 def test_threads_write(self):
3019 # Issue6750: concurrent writes could duplicate data
3020 event = threading.Event()
3021 with self.open(support.TESTFN, "w", buffering=1) as f:
3022 def run(n):
3023 text = "Thread%03d\n" % n
3024 event.wait()
3025 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003026 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003027 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003028 with support.start_threads(threads, event.set):
3029 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003030 with self.open(support.TESTFN) as f:
3031 content = f.read()
3032 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003033 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003034
Antoine Pitrou6be88762010-05-03 16:48:20 +00003035 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003036 # Test that text file is closed despite failed flush
3037 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003038 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003039 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003040 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003041 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003042 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003043 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003044 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003045 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003046 self.assertTrue(txt.buffer.closed)
3047 self.assertTrue(closed) # flush() called
3048 self.assertFalse(closed[0]) # flush() called before file closed
3049 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003050 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003051
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003052 def test_close_error_on_close(self):
3053 buffer = self.BytesIO(self.testdata)
3054 def bad_flush():
3055 raise OSError('flush')
3056 def bad_close():
3057 raise OSError('close')
3058 buffer.close = bad_close
3059 txt = self.TextIOWrapper(buffer, encoding="ascii")
3060 txt.flush = bad_flush
3061 with self.assertRaises(OSError) as err: # exception not swallowed
3062 txt.close()
3063 self.assertEqual(err.exception.args, ('close',))
3064 self.assertIsInstance(err.exception.__context__, OSError)
3065 self.assertEqual(err.exception.__context__.args, ('flush',))
3066 self.assertFalse(txt.closed)
3067
3068 def test_nonnormalized_close_error_on_close(self):
3069 # Issue #21677
3070 buffer = self.BytesIO(self.testdata)
3071 def bad_flush():
3072 raise non_existing_flush
3073 def bad_close():
3074 raise non_existing_close
3075 buffer.close = bad_close
3076 txt = self.TextIOWrapper(buffer, encoding="ascii")
3077 txt.flush = bad_flush
3078 with self.assertRaises(NameError) as err: # exception not swallowed
3079 txt.close()
3080 self.assertIn('non_existing_close', str(err.exception))
3081 self.assertIsInstance(err.exception.__context__, NameError)
3082 self.assertIn('non_existing_flush', str(err.exception.__context__))
3083 self.assertFalse(txt.closed)
3084
Antoine Pitrou6be88762010-05-03 16:48:20 +00003085 def test_multi_close(self):
3086 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3087 txt.close()
3088 txt.close()
3089 txt.close()
3090 self.assertRaises(ValueError, txt.flush)
3091
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003092 def test_unseekable(self):
3093 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3094 self.assertRaises(self.UnsupportedOperation, txt.tell)
3095 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3096
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003097 def test_readonly_attributes(self):
3098 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3099 buf = self.BytesIO(self.testdata)
3100 with self.assertRaises(AttributeError):
3101 txt.buffer = buf
3102
Antoine Pitroue96ec682011-07-23 21:46:35 +02003103 def test_rawio(self):
3104 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3105 # that subprocess.Popen() can have the required unbuffered
3106 # semantics with universal_newlines=True.
3107 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3108 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3109 # Reads
3110 self.assertEqual(txt.read(4), 'abcd')
3111 self.assertEqual(txt.readline(), 'efghi\n')
3112 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3113
3114 def test_rawio_write_through(self):
3115 # Issue #12591: with write_through=True, writes don't need a flush
3116 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3117 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3118 write_through=True)
3119 txt.write('1')
3120 txt.write('23\n4')
3121 txt.write('5')
3122 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3123
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003124 def test_bufio_write_through(self):
3125 # Issue #21396: write_through=True doesn't force a flush()
3126 # on the underlying binary buffered object.
3127 flush_called, write_called = [], []
3128 class BufferedWriter(self.BufferedWriter):
3129 def flush(self, *args, **kwargs):
3130 flush_called.append(True)
3131 return super().flush(*args, **kwargs)
3132 def write(self, *args, **kwargs):
3133 write_called.append(True)
3134 return super().write(*args, **kwargs)
3135
3136 rawio = self.BytesIO()
3137 data = b"a"
3138 bufio = BufferedWriter(rawio, len(data)*2)
3139 textio = self.TextIOWrapper(bufio, encoding='ascii',
3140 write_through=True)
3141 # write to the buffered io but don't overflow the buffer
3142 text = data.decode('ascii')
3143 textio.write(text)
3144
3145 # buffer.flush is not called with write_through=True
3146 self.assertFalse(flush_called)
3147 # buffer.write *is* called with write_through=True
3148 self.assertTrue(write_called)
3149 self.assertEqual(rawio.getvalue(), b"") # no flush
3150
3151 write_called = [] # reset
3152 textio.write(text * 10) # total content is larger than bufio buffer
3153 self.assertTrue(write_called)
3154 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3155
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003156 def test_read_nonbytes(self):
3157 # Issue #17106
3158 # Crash when underlying read() returns non-bytes
3159 t = self.TextIOWrapper(self.StringIO('a'))
3160 self.assertRaises(TypeError, t.read, 1)
3161 t = self.TextIOWrapper(self.StringIO('a'))
3162 self.assertRaises(TypeError, t.readline)
3163 t = self.TextIOWrapper(self.StringIO('a'))
3164 self.assertRaises(TypeError, t.read)
3165
3166 def test_illegal_decoder(self):
3167 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003168 # Bypass the early encoding check added in issue 20404
3169 def _make_illegal_wrapper():
3170 quopri = codecs.lookup("quopri")
3171 quopri._is_text_encoding = True
3172 try:
3173 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3174 newline='\n', encoding="quopri")
3175 finally:
3176 quopri._is_text_encoding = False
3177 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003178 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003179 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003180 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003181 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003182 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003183 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003184 self.assertRaises(TypeError, t.read)
3185
Antoine Pitrou712cb732013-12-21 15:51:54 +01003186 def _check_create_at_shutdown(self, **kwargs):
3187 # Issue #20037: creating a TextIOWrapper at shutdown
3188 # shouldn't crash the interpreter.
3189 iomod = self.io.__name__
3190 code = """if 1:
3191 import codecs
3192 import {iomod} as io
3193
3194 # Avoid looking up codecs at shutdown
3195 codecs.lookup('utf-8')
3196
3197 class C:
3198 def __init__(self):
3199 self.buf = io.BytesIO()
3200 def __del__(self):
3201 io.TextIOWrapper(self.buf, **{kwargs})
3202 print("ok")
3203 c = C()
3204 """.format(iomod=iomod, kwargs=kwargs)
3205 return assert_python_ok("-c", code)
3206
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003207 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003208 def test_create_at_shutdown_without_encoding(self):
3209 rc, out, err = self._check_create_at_shutdown()
3210 if err:
3211 # Can error out with a RuntimeError if the module state
3212 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003213 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003214 else:
3215 self.assertEqual("ok", out.decode().strip())
3216
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003217 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003218 def test_create_at_shutdown_with_encoding(self):
3219 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3220 errors='strict')
3221 self.assertFalse(err)
3222 self.assertEqual("ok", out.decode().strip())
3223
Antoine Pitroub8503892014-04-29 10:14:02 +02003224 def test_read_byteslike(self):
3225 r = MemviewBytesIO(b'Just some random string\n')
3226 t = self.TextIOWrapper(r, 'utf-8')
3227
3228 # TextIOwrapper will not read the full string, because
3229 # we truncate it to a multiple of the native int size
3230 # so that we can construct a more complex memoryview.
3231 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3232
3233 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3234
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003235 def test_issue22849(self):
3236 class F(object):
3237 def readable(self): return True
3238 def writable(self): return True
3239 def seekable(self): return True
3240
3241 for i in range(10):
3242 try:
3243 self.TextIOWrapper(F(), encoding='utf-8')
3244 except Exception:
3245 pass
3246
3247 F.tell = lambda x: 0
3248 t = self.TextIOWrapper(F(), encoding='utf-8')
3249
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003250
Antoine Pitroub8503892014-04-29 10:14:02 +02003251class MemviewBytesIO(io.BytesIO):
3252 '''A BytesIO object whose read method returns memoryviews
3253 rather than bytes'''
3254
3255 def read1(self, len_):
3256 return _to_memoryview(super().read1(len_))
3257
3258 def read(self, len_):
3259 return _to_memoryview(super().read(len_))
3260
3261def _to_memoryview(buf):
3262 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3263
3264 arr = array.array('i')
3265 idx = len(buf) - len(buf) % arr.itemsize
3266 arr.frombytes(buf[:idx])
3267 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003268
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003269
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003270class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003271 io = io
Steve Dowerfe8f4c92016-09-07 09:31:52 -07003272 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003273
3274 def test_initialization(self):
3275 r = self.BytesIO(b"\xc3\xa9\n\n")
3276 b = self.BufferedReader(r, 1000)
3277 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003278 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3279 self.assertRaises(ValueError, t.read)
3280
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003281 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3282 self.assertRaises(Exception, repr, t)
3283
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003284 def test_garbage_collection(self):
3285 # C TextIOWrapper objects are collected, and collecting them flushes
3286 # all data to disk.
3287 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003288 with support.check_warnings(('', ResourceWarning)):
3289 rawio = io.FileIO(support.TESTFN, "wb")
3290 b = self.BufferedWriter(rawio)
3291 t = self.TextIOWrapper(b, encoding="ascii")
3292 t.write("456def")
3293 t.x = t
3294 wr = weakref.ref(t)
3295 del t
3296 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003297 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003298 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003299 self.assertEqual(f.read(), b"456def")
3300
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003301 def test_rwpair_cleared_before_textio(self):
3302 # Issue 13070: TextIOWrapper's finalization would crash when called
3303 # after the reference to the underlying BufferedRWPair's writer got
3304 # cleared by the GC.
3305 for i in range(1000):
3306 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3307 t1 = self.TextIOWrapper(b1, encoding="ascii")
3308 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3309 t2 = self.TextIOWrapper(b2, encoding="ascii")
3310 # circular references
3311 t1.buddy = t2
3312 t2.buddy = t1
3313 support.gc_collect()
3314
3315
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003316class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003317 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003318 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003319
3320
3321class IncrementalNewlineDecoderTest(unittest.TestCase):
3322
3323 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003324 # UTF-8 specific tests for a newline decoder
3325 def _check_decode(b, s, **kwargs):
3326 # We exercise getstate() / setstate() as well as decode()
3327 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003328 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003329 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003330 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003331
Antoine Pitrou180a3362008-12-14 16:36:46 +00003332 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003333
Antoine Pitrou180a3362008-12-14 16:36:46 +00003334 _check_decode(b'\xe8', "")
3335 _check_decode(b'\xa2', "")
3336 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003337
Antoine Pitrou180a3362008-12-14 16:36:46 +00003338 _check_decode(b'\xe8', "")
3339 _check_decode(b'\xa2', "")
3340 _check_decode(b'\x88', "\u8888")
3341
3342 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003343 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3344
Antoine Pitrou180a3362008-12-14 16:36:46 +00003345 decoder.reset()
3346 _check_decode(b'\n', "\n")
3347 _check_decode(b'\r', "")
3348 _check_decode(b'', "\n", final=True)
3349 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003350
Antoine Pitrou180a3362008-12-14 16:36:46 +00003351 _check_decode(b'\r', "")
3352 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003353
Antoine Pitrou180a3362008-12-14 16:36:46 +00003354 _check_decode(b'\r\r\n', "\n\n")
3355 _check_decode(b'\r', "")
3356 _check_decode(b'\r', "\n")
3357 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003358
Antoine Pitrou180a3362008-12-14 16:36:46 +00003359 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3360 _check_decode(b'\xe8\xa2\x88', "\u8888")
3361 _check_decode(b'\n', "\n")
3362 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3363 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003364
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003365 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003366 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003367 if encoding is not None:
3368 encoder = codecs.getincrementalencoder(encoding)()
3369 def _decode_bytewise(s):
3370 # Decode one byte at a time
3371 for b in encoder.encode(s):
3372 result.append(decoder.decode(bytes([b])))
3373 else:
3374 encoder = None
3375 def _decode_bytewise(s):
3376 # Decode one char at a time
3377 for c in s:
3378 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003379 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003380 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003381 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003382 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003383 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003384 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003385 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003386 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003387 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003388 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003389 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003390 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003391 input = "abc"
3392 if encoder is not None:
3393 encoder.reset()
3394 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003395 self.assertEqual(decoder.decode(input), "abc")
3396 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003397
3398 def test_newline_decoder(self):
3399 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003400 # None meaning the IncrementalNewlineDecoder takes unicode input
3401 # rather than bytes input
3402 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003403 'utf-16', 'utf-16-le', 'utf-16-be',
3404 'utf-32', 'utf-32-le', 'utf-32-be',
3405 )
3406 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003407 decoder = enc and codecs.getincrementaldecoder(enc)()
3408 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3409 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003410 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003411 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3412 self.check_newline_decoding_utf8(decoder)
3413
Antoine Pitrou66913e22009-03-06 23:40:56 +00003414 def test_newline_bytes(self):
3415 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3416 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003417 self.assertEqual(dec.newlines, None)
3418 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3419 self.assertEqual(dec.newlines, None)
3420 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3421 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003422 dec = self.IncrementalNewlineDecoder(None, translate=False)
3423 _check(dec)
3424 dec = self.IncrementalNewlineDecoder(None, translate=True)
3425 _check(dec)
3426
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003427class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3428 pass
3429
3430class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3431 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003432
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003433
Guido van Rossum01a27522007-03-07 01:00:12 +00003434# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003435
Guido van Rossum5abbf752007-08-27 17:39:33 +00003436class MiscIOTest(unittest.TestCase):
3437
Barry Warsaw40e82462008-11-20 20:14:50 +00003438 def tearDown(self):
3439 support.unlink(support.TESTFN)
3440
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003441 def test___all__(self):
3442 for name in self.io.__all__:
3443 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003444 self.assertIsNotNone(obj, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003445 if name == "open":
3446 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003447 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003448 self.assertTrue(issubclass(obj, Exception), name)
3449 elif not name.startswith("SEEK_"):
3450 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003451
Barry Warsaw40e82462008-11-20 20:14:50 +00003452 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003453 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003454 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003455 f.close()
3456
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003457 with support.check_warnings(('', DeprecationWarning)):
3458 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003459 self.assertEqual(f.name, support.TESTFN)
3460 self.assertEqual(f.buffer.name, support.TESTFN)
3461 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3462 self.assertEqual(f.mode, "U")
3463 self.assertEqual(f.buffer.mode, "rb")
3464 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003465 f.close()
3466
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003467 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003468 self.assertEqual(f.mode, "w+")
3469 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3470 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003471
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003472 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003473 self.assertEqual(g.mode, "wb")
3474 self.assertEqual(g.raw.mode, "wb")
3475 self.assertEqual(g.name, f.fileno())
3476 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003477 f.close()
3478 g.close()
3479
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003480 def test_io_after_close(self):
3481 for kwargs in [
3482 {"mode": "w"},
3483 {"mode": "wb"},
3484 {"mode": "w", "buffering": 1},
3485 {"mode": "w", "buffering": 2},
3486 {"mode": "wb", "buffering": 0},
3487 {"mode": "r"},
3488 {"mode": "rb"},
3489 {"mode": "r", "buffering": 1},
3490 {"mode": "r", "buffering": 2},
3491 {"mode": "rb", "buffering": 0},
3492 {"mode": "w+"},
3493 {"mode": "w+b"},
3494 {"mode": "w+", "buffering": 1},
3495 {"mode": "w+", "buffering": 2},
3496 {"mode": "w+b", "buffering": 0},
3497 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003498 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003499 f.close()
3500 self.assertRaises(ValueError, f.flush)
3501 self.assertRaises(ValueError, f.fileno)
3502 self.assertRaises(ValueError, f.isatty)
3503 self.assertRaises(ValueError, f.__iter__)
3504 if hasattr(f, "peek"):
3505 self.assertRaises(ValueError, f.peek, 1)
3506 self.assertRaises(ValueError, f.read)
3507 if hasattr(f, "read1"):
3508 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003509 if hasattr(f, "readall"):
3510 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003511 if hasattr(f, "readinto"):
3512 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003513 if hasattr(f, "readinto1"):
3514 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003515 self.assertRaises(ValueError, f.readline)
3516 self.assertRaises(ValueError, f.readlines)
Xiang Zhangd5fa5f32017-04-15 13:25:15 +08003517 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003518 self.assertRaises(ValueError, f.seek, 0)
3519 self.assertRaises(ValueError, f.tell)
3520 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003521 self.assertRaises(ValueError, f.write,
3522 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003523 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003524 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003525
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003526 def test_blockingioerror(self):
3527 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003528 class C(str):
3529 pass
3530 c = C("")
3531 b = self.BlockingIOError(1, c)
3532 c.b = b
3533 b.c = c
3534 wr = weakref.ref(c)
3535 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003536 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003537 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003538
3539 def test_abcs(self):
3540 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003541 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3542 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3543 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3544 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003545
3546 def _check_abc_inheritance(self, abcmodule):
3547 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003548 self.assertIsInstance(f, abcmodule.IOBase)
3549 self.assertIsInstance(f, abcmodule.RawIOBase)
3550 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3551 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003552 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003553 self.assertIsInstance(f, abcmodule.IOBase)
3554 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3555 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3556 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003557 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003558 self.assertIsInstance(f, abcmodule.IOBase)
3559 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3560 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3561 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003562
3563 def test_abc_inheritance(self):
3564 # Test implementations inherit from their respective ABCs
3565 self._check_abc_inheritance(self)
3566
3567 def test_abc_inheritance_official(self):
3568 # Test implementations inherit from the official ABCs of the
3569 # baseline "io" module.
3570 self._check_abc_inheritance(io)
3571
Antoine Pitroue033e062010-10-29 10:38:18 +00003572 def _check_warn_on_dealloc(self, *args, **kwargs):
3573 f = open(*args, **kwargs)
3574 r = repr(f)
3575 with self.assertWarns(ResourceWarning) as cm:
3576 f = None
3577 support.gc_collect()
3578 self.assertIn(r, str(cm.warning.args[0]))
3579
3580 def test_warn_on_dealloc(self):
3581 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3582 self._check_warn_on_dealloc(support.TESTFN, "wb")
3583 self._check_warn_on_dealloc(support.TESTFN, "w")
3584
3585 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3586 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003587 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003588 for fd in fds:
3589 try:
3590 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003591 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003592 if e.errno != errno.EBADF:
3593 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003594 self.addCleanup(cleanup_fds)
3595 r, w = os.pipe()
3596 fds += r, w
3597 self._check_warn_on_dealloc(r, *args, **kwargs)
3598 # When using closefd=False, there's no warning
3599 r, w = os.pipe()
3600 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02003601 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00003602 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00003603
3604 def test_warn_on_dealloc_fd(self):
3605 self._check_warn_on_dealloc_fd("rb", buffering=0)
3606 self._check_warn_on_dealloc_fd("rb")
3607 self._check_warn_on_dealloc_fd("r")
3608
3609
Antoine Pitrou243757e2010-11-05 21:15:39 +00003610 def test_pickling(self):
3611 # Pickling file objects is forbidden
3612 for kwargs in [
3613 {"mode": "w"},
3614 {"mode": "wb"},
3615 {"mode": "wb", "buffering": 0},
3616 {"mode": "r"},
3617 {"mode": "rb"},
3618 {"mode": "rb", "buffering": 0},
3619 {"mode": "w+"},
3620 {"mode": "w+b"},
3621 {"mode": "w+b", "buffering": 0},
3622 ]:
3623 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3624 with self.open(support.TESTFN, **kwargs) as f:
3625 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3626
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003627 def test_nonblock_pipe_write_bigbuf(self):
3628 self._test_nonblock_pipe_write(16*1024)
3629
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003630 def test_nonblock_pipe_write_smallbuf(self):
3631 self._test_nonblock_pipe_write(1024)
3632
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003633 @unittest.skipUnless(hasattr(os, 'set_blocking'),
3634 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003635 def _test_nonblock_pipe_write(self, bufsize):
3636 sent = []
3637 received = []
3638 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003639 os.set_blocking(r, False)
3640 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003641
3642 # To exercise all code paths in the C implementation we need
3643 # to play with buffer sizes. For instance, if we choose a
3644 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3645 # then we will never get a partial write of the buffer.
3646 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3647 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3648
3649 with rf, wf:
3650 for N in 9999, 73, 7574:
3651 try:
3652 i = 0
3653 while True:
3654 msg = bytes([i % 26 + 97]) * N
3655 sent.append(msg)
3656 wf.write(msg)
3657 i += 1
3658
3659 except self.BlockingIOError as e:
3660 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003661 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003662 sent[-1] = sent[-1][:e.characters_written]
3663 received.append(rf.read())
3664 msg = b'BLOCKED'
3665 wf.write(msg)
3666 sent.append(msg)
3667
3668 while True:
3669 try:
3670 wf.flush()
3671 break
3672 except self.BlockingIOError as e:
3673 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003674 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003675 self.assertEqual(e.characters_written, 0)
3676 received.append(rf.read())
3677
3678 received += iter(rf.read, None)
3679
3680 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003681 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003682 self.assertTrue(wf.closed)
3683 self.assertTrue(rf.closed)
3684
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003685 def test_create_fail(self):
3686 # 'x' mode fails if file is existing
3687 with self.open(support.TESTFN, 'w'):
3688 pass
3689 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3690
3691 def test_create_writes(self):
3692 # 'x' mode opens for writing
3693 with self.open(support.TESTFN, 'xb') as f:
3694 f.write(b"spam")
3695 with self.open(support.TESTFN, 'rb') as f:
3696 self.assertEqual(b"spam", f.read())
3697
Christian Heimes7b648752012-09-10 14:48:43 +02003698 def test_open_allargs(self):
3699 # there used to be a buffer overflow in the parser for rawmode
3700 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3701
3702
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003703class CMiscIOTest(MiscIOTest):
3704 io = io
3705
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003706 def test_readinto_buffer_overflow(self):
3707 # Issue #18025
3708 class BadReader(self.io.BufferedIOBase):
3709 def read(self, n=-1):
3710 return b'x' * 10**6
3711 bufio = BadReader()
3712 b = bytearray(2)
3713 self.assertRaises(ValueError, bufio.readinto, b)
3714
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003715 @unittest.skipUnless(threading, 'Threading required for this test.')
3716 def check_daemon_threads_shutdown_deadlock(self, stream_name):
3717 # Issue #23309: deadlocks at shutdown should be avoided when a
3718 # daemon thread and the main thread both write to a file.
3719 code = """if 1:
3720 import sys
3721 import time
3722 import threading
Victor Stinnerb984a052017-04-21 18:27:29 +02003723 from test.support import SuppressCrashReport
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003724
3725 file = sys.{stream_name}
3726
3727 def run():
3728 while True:
3729 file.write('.')
3730 file.flush()
3731
Victor Stinnerb984a052017-04-21 18:27:29 +02003732 crash = SuppressCrashReport()
3733 crash.__enter__()
3734 # don't call __exit__(): the crash occurs at Python shutdown
3735
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003736 thread = threading.Thread(target=run)
3737 thread.daemon = True
3738 thread.start()
3739
3740 time.sleep(0.5)
3741 file.write('!')
3742 file.flush()
3743 """.format_map(locals())
3744 res, _ = run_python_until_end("-c", code)
3745 err = res.err.decode()
3746 if res.rc != 0:
3747 # Failure: should be a fatal error
3748 self.assertIn("Fatal Python error: could not acquire lock "
3749 "for <_io.BufferedWriter name='<{stream_name}>'> "
3750 "at interpreter shutdown, possibly due to "
3751 "daemon threads".format_map(locals()),
3752 err)
3753 else:
3754 self.assertFalse(err.strip('.!'))
3755
3756 def test_daemon_threads_shutdown_stdout_deadlock(self):
3757 self.check_daemon_threads_shutdown_deadlock('stdout')
3758
3759 def test_daemon_threads_shutdown_stderr_deadlock(self):
3760 self.check_daemon_threads_shutdown_deadlock('stderr')
3761
3762
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003763class PyMiscIOTest(MiscIOTest):
3764 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003765
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003766
3767@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3768class SignalsTest(unittest.TestCase):
3769
3770 def setUp(self):
3771 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3772
3773 def tearDown(self):
3774 signal.signal(signal.SIGALRM, self.oldalrm)
3775
3776 def alarm_interrupt(self, sig, frame):
3777 1/0
3778
3779 @unittest.skipUnless(threading, 'Threading required for this test.')
3780 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3781 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003782 invokes the signal handler, and bubbles up the exception raised
3783 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003784 read_results = []
3785 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003786 if hasattr(signal, 'pthread_sigmask'):
3787 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003788 s = os.read(r, 1)
3789 read_results.append(s)
3790 t = threading.Thread(target=_read)
3791 t.daemon = True
3792 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003793 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01003794 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003795 try:
3796 wio = self.io.open(w, **fdopen_kwargs)
3797 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003798 # Fill the pipe enough that the write will be blocking.
3799 # It will be interrupted by the timer armed above. Since the
3800 # other thread has read one byte, the low-level write will
3801 # return with a successful (partial) result rather than an EINTR.
3802 # The buffered IO layer must check for pending signal
3803 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003804 signal.alarm(1)
3805 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01003806 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02003807 finally:
3808 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003809 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003810 # We got one byte, get another one and check that it isn't a
3811 # repeat of the first one.
3812 read_results.append(os.read(r, 1))
3813 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3814 finally:
3815 os.close(w)
3816 os.close(r)
3817 # This is deliberate. If we didn't close the file descriptor
3818 # before closing wio, wio would try to flush its internal
3819 # buffer, and block again.
3820 try:
3821 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003822 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003823 if e.errno != errno.EBADF:
3824 raise
3825
3826 def test_interrupted_write_unbuffered(self):
3827 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3828
3829 def test_interrupted_write_buffered(self):
3830 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3831
Victor Stinner6ab72862014-09-03 23:32:28 +02003832 # Issue #22331: The test hangs on FreeBSD 7.2
3833 @support.requires_freebsd_version(8)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003834 def test_interrupted_write_text(self):
3835 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3836
Brett Cannon31f59292011-02-21 19:29:56 +00003837 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003838 def check_reentrant_write(self, data, **fdopen_kwargs):
3839 def on_alarm(*args):
3840 # Will be called reentrantly from the same thread
3841 wio.write(data)
3842 1/0
3843 signal.signal(signal.SIGALRM, on_alarm)
3844 r, w = os.pipe()
3845 wio = self.io.open(w, **fdopen_kwargs)
3846 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003847 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003848 # Either the reentrant call to wio.write() fails with RuntimeError,
3849 # or the signal handler raises ZeroDivisionError.
3850 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3851 while 1:
3852 for i in range(100):
3853 wio.write(data)
3854 wio.flush()
3855 # Make sure the buffer doesn't fill up and block further writes
3856 os.read(r, len(data) * 100)
3857 exc = cm.exception
3858 if isinstance(exc, RuntimeError):
3859 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3860 finally:
3861 wio.close()
3862 os.close(r)
3863
3864 def test_reentrant_write_buffered(self):
3865 self.check_reentrant_write(b"xy", mode="wb")
3866
3867 def test_reentrant_write_text(self):
3868 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3869
Antoine Pitrou707ce822011-02-25 21:24:11 +00003870 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3871 """Check that a buffered read, when it gets interrupted (either
3872 returning a partial result or EINTR), properly invokes the signal
3873 handler and retries if the latter returned successfully."""
3874 r, w = os.pipe()
3875 fdopen_kwargs["closefd"] = False
3876 def alarm_handler(sig, frame):
3877 os.write(w, b"bar")
3878 signal.signal(signal.SIGALRM, alarm_handler)
3879 try:
3880 rio = self.io.open(r, **fdopen_kwargs)
3881 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003882 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003883 # Expected behaviour:
3884 # - first raw read() returns partial b"foo"
3885 # - second raw read() returns EINTR
3886 # - third raw read() returns b"bar"
3887 self.assertEqual(decode(rio.read(6)), "foobar")
3888 finally:
3889 rio.close()
3890 os.close(w)
3891 os.close(r)
3892
Antoine Pitrou20db5112011-08-19 20:32:34 +02003893 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003894 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3895 mode="rb")
3896
Antoine Pitrou20db5112011-08-19 20:32:34 +02003897 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003898 self.check_interrupted_read_retry(lambda x: x,
3899 mode="r")
3900
3901 @unittest.skipUnless(threading, 'Threading required for this test.')
3902 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3903 """Check that a buffered write, when it gets interrupted (either
3904 returning a partial result or EINTR), properly invokes the signal
3905 handler and retries if the latter returned successfully."""
3906 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003907
Antoine Pitrou707ce822011-02-25 21:24:11 +00003908 # A quantity that exceeds the buffer size of an anonymous pipe's
3909 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003910 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003911 r, w = os.pipe()
3912 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003913
Antoine Pitrou707ce822011-02-25 21:24:11 +00003914 # We need a separate thread to read from the pipe and allow the
3915 # write() to finish. This thread is started after the SIGALRM is
3916 # received (forcing a first EINTR in write()).
3917 read_results = []
3918 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003919 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00003920 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003921 try:
3922 while not write_finished:
3923 while r in select.select([r], [], [], 1.0)[0]:
3924 s = os.read(r, 1024)
3925 read_results.append(s)
3926 except BaseException as exc:
3927 nonlocal error
3928 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00003929 t = threading.Thread(target=_read)
3930 t.daemon = True
3931 def alarm1(sig, frame):
3932 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003933 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003934 def alarm2(sig, frame):
3935 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003936
3937 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00003938 signal.signal(signal.SIGALRM, alarm1)
3939 try:
3940 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003941 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003942 # Expected behaviour:
3943 # - first raw write() is partial (because of the limited pipe buffer
3944 # and the first alarm)
3945 # - second raw write() returns EINTR (because of the second alarm)
3946 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003947 written = wio.write(large_data)
3948 self.assertEqual(N, written)
3949
Antoine Pitrou707ce822011-02-25 21:24:11 +00003950 wio.flush()
3951 write_finished = True
3952 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003953
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003954 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003955 self.assertEqual(N, sum(len(x) for x in read_results))
3956 finally:
3957 write_finished = True
3958 os.close(w)
3959 os.close(r)
3960 # This is deliberate. If we didn't close the file descriptor
3961 # before closing wio, wio would try to flush its internal
3962 # buffer, and could block (in case of failure).
3963 try:
3964 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003965 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003966 if e.errno != errno.EBADF:
3967 raise
3968
Antoine Pitrou20db5112011-08-19 20:32:34 +02003969 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003970 self.check_interrupted_write_retry(b"x", mode="wb")
3971
Antoine Pitrou20db5112011-08-19 20:32:34 +02003972 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003973 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3974
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003975
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003976class CSignalsTest(SignalsTest):
3977 io = io
3978
3979class PySignalsTest(SignalsTest):
3980 io = pyio
3981
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003982 # Handling reentrancy issues would slow down _pyio even more, so the
3983 # tests are disabled.
3984 test_reentrant_write_buffered = None
3985 test_reentrant_write_text = None
3986
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003987
Ezio Melottidaa42c72013-03-23 16:30:16 +02003988def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07003989 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003990 CBufferedReaderTest, PyBufferedReaderTest,
3991 CBufferedWriterTest, PyBufferedWriterTest,
3992 CBufferedRWPairTest, PyBufferedRWPairTest,
3993 CBufferedRandomTest, PyBufferedRandomTest,
3994 StatefulIncrementalDecoderTest,
3995 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3996 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003997 CMiscIOTest, PyMiscIOTest,
3998 CSignalsTest, PySignalsTest,
3999 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004000
4001 # Put the namespaces of the IO module we are testing and some useful mock
4002 # classes in the __dict__ of each test.
4003 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00004004 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004005 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4006 c_io_ns = {name : getattr(io, name) for name in all_members}
4007 py_io_ns = {name : getattr(pyio, name) for name in all_members}
4008 globs = globals()
4009 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4010 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4011 # Avoid turning open into a bound method.
4012 py_io_ns["open"] = pyio.OpenWrapper
4013 for test in tests:
4014 if test.__name__.startswith("C"):
4015 for name, obj in c_io_ns.items():
4016 setattr(test, name, obj)
4017 elif test.__name__.startswith("Py"):
4018 for name, obj in py_io_ns.items():
4019 setattr(test, name, obj)
4020
Ezio Melottidaa42c72013-03-23 16:30:16 +02004021 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4022 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004023
4024if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004025 unittest.main()