blob: 69487a1b50dc3b85c49c0da6cf03a1194a9b603f [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Berker Peksagce643912015-05-06 06:33:17 +030038from test.support.script_helper import assert_python_ok, run_python_until_end
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000047
Martin Panter6bb91f32016-05-28 00:41:57 +000048try:
49 import ctypes
50except ImportError:
51 def byteslike(*pos, **kw):
52 return array.array("b", bytes(*pos, **kw))
53else:
54 def byteslike(*pos, **kw):
55 """Create a bytes-like object having no string or sequence methods"""
56 data = bytes(*pos, **kw)
57 obj = EmptyStruct()
58 ctypes.resize(obj, len(data))
59 memoryview(obj).cast("B")[:] = data
60 return obj
61 class EmptyStruct(ctypes.Structure):
62 pass
63
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064def _default_chunk_size():
65 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000066 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000067 return f._CHUNK_SIZE
68
69
Antoine Pitrou328ec742010-09-14 18:37:24 +000070class MockRawIOWithoutRead:
71 """A RawIO implementation without read(), so as to exercise the default
72 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000073
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000074 def __init__(self, read_stack=()):
75 self._read_stack = list(read_stack)
76 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000077 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000078 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000079
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000082 return len(b)
83
84 def writable(self):
85 return True
86
Guido van Rossum68bbcd22007-02-27 17:19:33 +000087 def fileno(self):
88 return 42
89
90 def readable(self):
91 return True
92
Guido van Rossum01a27522007-03-07 01:00:12 +000093 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000094 return True
95
Guido van Rossum01a27522007-03-07 01:00:12 +000096 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000098
99 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000100 return 0 # same comment as above
101
102 def readinto(self, buf):
103 self._reads += 1
104 max_len = len(buf)
105 try:
106 data = self._read_stack[0]
107 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000108 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000109 return 0
110 if data is None:
111 del self._read_stack[0]
112 return None
113 n = len(data)
114 if len(data) <= max_len:
115 del self._read_stack[0]
116 buf[:n] = data
117 return n
118 else:
119 buf[:] = data[:max_len]
120 self._read_stack[0] = data[max_len:]
121 return max_len
122
123 def truncate(self, pos=None):
124 return pos
125
Antoine Pitrou328ec742010-09-14 18:37:24 +0000126class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
127 pass
128
129class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
130 pass
131
132
133class MockRawIO(MockRawIOWithoutRead):
134
135 def read(self, n=None):
136 self._reads += 1
137 try:
138 return self._read_stack.pop(0)
139 except:
140 self._extraneous_reads += 1
141 return b""
142
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000143class CMockRawIO(MockRawIO, io.RawIOBase):
144 pass
145
146class PyMockRawIO(MockRawIO, pyio.RawIOBase):
147 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000148
Guido van Rossuma9e20242007-03-08 00:43:48 +0000149
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000150class MisbehavedRawIO(MockRawIO):
151 def write(self, b):
152 return super().write(b) * 2
153
154 def read(self, n=None):
155 return super().read(n) * 2
156
157 def seek(self, pos, whence):
158 return -123
159
160 def tell(self):
161 return -456
162
163 def readinto(self, buf):
164 super().readinto(buf)
165 return len(buf) * 5
166
167class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
168 pass
169
170class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
171 pass
172
173
174class CloseFailureIO(MockRawIO):
175 closed = 0
176
177 def close(self):
178 if not self.closed:
179 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200180 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181
182class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
183 pass
184
185class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
186 pass
187
188
189class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000190
191 def __init__(self, data):
192 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000193 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000194
195 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000197 self.read_history.append(None if res is None else len(res))
198 return res
199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000200 def readinto(self, b):
201 res = super().readinto(b)
202 self.read_history.append(res)
203 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000204
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000205class CMockFileIO(MockFileIO, io.BytesIO):
206 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000207
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000208class PyMockFileIO(MockFileIO, pyio.BytesIO):
209 pass
210
211
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000212class MockUnseekableIO:
213 def seekable(self):
214 return False
215
216 def seek(self, *args):
217 raise self.UnsupportedOperation("not seekable")
218
219 def tell(self, *args):
220 raise self.UnsupportedOperation("not seekable")
221
Martin Panter754aab22016-03-31 07:21:56 +0000222 def truncate(self, *args):
223 raise self.UnsupportedOperation("not seekable")
224
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000225class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
226 UnsupportedOperation = io.UnsupportedOperation
227
228class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
229 UnsupportedOperation = pyio.UnsupportedOperation
230
231
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000232class MockNonBlockWriterIO:
233
234 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000235 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000236 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000238 def pop_written(self):
239 s = b"".join(self._write_stack)
240 self._write_stack[:] = []
241 return s
242
243 def block_on(self, char):
244 """Block when a given char is encountered."""
245 self._blocker_char = char
246
247 def readable(self):
248 return True
249
250 def seekable(self):
251 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000252
Guido van Rossum01a27522007-03-07 01:00:12 +0000253 def writable(self):
254 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000255
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000256 def write(self, b):
257 b = bytes(b)
258 n = -1
259 if self._blocker_char:
260 try:
261 n = b.index(self._blocker_char)
262 except ValueError:
263 pass
264 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100265 if n > 0:
266 # write data up to the first blocker
267 self._write_stack.append(b[:n])
268 return n
269 else:
270 # cancel blocker and indicate would block
271 self._blocker_char = None
272 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000273 self._write_stack.append(b)
274 return len(b)
275
276class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
277 BlockingIOError = io.BlockingIOError
278
279class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
280 BlockingIOError = pyio.BlockingIOError
281
Guido van Rossuma9e20242007-03-08 00:43:48 +0000282
Guido van Rossum28524c72007-02-27 05:47:44 +0000283class IOTest(unittest.TestCase):
284
Neal Norwitze7789b12008-03-24 06:18:09 +0000285 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000286 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000287
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000288 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000289 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000290
Guido van Rossum28524c72007-02-27 05:47:44 +0000291 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000292 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293 f.truncate(0)
294 self.assertEqual(f.tell(), 5)
295 f.seek(0)
296
297 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000298 self.assertEqual(f.seek(0), 0)
299 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000300 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000301 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000302 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000303 buffer = bytearray(b" world\n\n\n")
304 self.assertEqual(f.write(buffer), 9)
305 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000306 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000307 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000308 self.assertEqual(f.seek(-1, 2), 13)
309 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000310
Guido van Rossum87429772007-04-10 21:06:59 +0000311 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000312 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000313 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000314
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 def read_ops(self, f, buffered=False):
316 data = f.read(5)
317 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000318 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000319 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000320 self.assertEqual(bytes(data), b" worl")
321 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000322 self.assertEqual(f.readinto(data), 2)
323 self.assertEqual(len(data), 5)
324 self.assertEqual(data[:2], b"d\n")
325 self.assertEqual(f.seek(0), 0)
326 self.assertEqual(f.read(20), b"hello world\n")
327 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000328 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000329 self.assertEqual(f.seek(-6, 2), 6)
330 self.assertEqual(f.read(5), b"world")
331 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000332 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000333 self.assertEqual(f.seek(-6, 1), 5)
334 self.assertEqual(f.read(5), b" worl")
335 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000336 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000337 if buffered:
338 f.seek(0)
339 self.assertEqual(f.read(), b"hello world\n")
340 f.seek(6)
341 self.assertEqual(f.read(), b"world\n")
342 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000343 f.seek(0)
344 data = byteslike(5)
345 self.assertEqual(f.readinto1(data), 5)
346 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000347
Guido van Rossum34d69e52007-04-10 20:08:41 +0000348 LARGE = 2**31
349
Guido van Rossum53807da2007-04-10 19:01:47 +0000350 def large_file_ops(self, f):
351 assert f.readable()
352 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100353 try:
354 self.assertEqual(f.seek(self.LARGE), self.LARGE)
355 except (OverflowError, ValueError):
356 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000357 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000358 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000359 self.assertEqual(f.tell(), self.LARGE + 3)
360 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000361 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000362 self.assertEqual(f.tell(), self.LARGE + 2)
363 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000364 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000365 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000366 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
367 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000368 self.assertEqual(f.read(2), b"x")
369
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000370 def test_invalid_operations(self):
371 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000372 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000373 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000374 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000375 self.assertRaises(exc, fp.read)
376 self.assertRaises(exc, fp.readline)
377 with self.open(support.TESTFN, "wb", buffering=0) as fp:
378 self.assertRaises(exc, fp.read)
379 self.assertRaises(exc, fp.readline)
380 with self.open(support.TESTFN, "rb", buffering=0) as fp:
381 self.assertRaises(exc, fp.write, b"blah")
382 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000383 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000384 self.assertRaises(exc, fp.write, b"blah")
385 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000386 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000387 self.assertRaises(exc, fp.write, "blah")
388 self.assertRaises(exc, fp.writelines, ["blah\n"])
389 # Non-zero seeking from current or end pos
390 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
391 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000392
Martin Panter754aab22016-03-31 07:21:56 +0000393 def test_optional_abilities(self):
394 # Test for OSError when optional APIs are not supported
395 # The purpose of this test is to try fileno(), reading, writing and
396 # seeking operations with various objects that indicate they do not
397 # support these operations.
398
399 def pipe_reader():
400 [r, w] = os.pipe()
401 os.close(w) # So that read() is harmless
402 return self.FileIO(r, "r")
403
404 def pipe_writer():
405 [r, w] = os.pipe()
406 self.addCleanup(os.close, r)
407 # Guarantee that we can write into the pipe without blocking
408 thread = threading.Thread(target=os.read, args=(r, 100))
409 thread.start()
410 self.addCleanup(thread.join)
411 return self.FileIO(w, "w")
412
413 def buffered_reader():
414 return self.BufferedReader(self.MockUnseekableIO())
415
416 def buffered_writer():
417 return self.BufferedWriter(self.MockUnseekableIO())
418
419 def buffered_random():
420 return self.BufferedRandom(self.BytesIO())
421
422 def buffered_rw_pair():
423 return self.BufferedRWPair(self.MockUnseekableIO(),
424 self.MockUnseekableIO())
425
426 def text_reader():
427 class UnseekableReader(self.MockUnseekableIO):
428 writable = self.BufferedIOBase.writable
429 write = self.BufferedIOBase.write
430 return self.TextIOWrapper(UnseekableReader(), "ascii")
431
432 def text_writer():
433 class UnseekableWriter(self.MockUnseekableIO):
434 readable = self.BufferedIOBase.readable
435 read = self.BufferedIOBase.read
436 return self.TextIOWrapper(UnseekableWriter(), "ascii")
437
438 tests = (
439 (pipe_reader, "fr"), (pipe_writer, "fw"),
440 (buffered_reader, "r"), (buffered_writer, "w"),
441 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
442 (text_reader, "r"), (text_writer, "w"),
443 (self.BytesIO, "rws"), (self.StringIO, "rws"),
444 )
445 for [test, abilities] in tests:
446 if test is pipe_writer and not threading:
447 continue # Skip subtest that uses a background thread
448 with self.subTest(test), test() as obj:
449 readable = "r" in abilities
450 self.assertEqual(obj.readable(), readable)
451 writable = "w" in abilities
452 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000453
454 if isinstance(obj, self.TextIOBase):
455 data = "3"
456 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
457 data = b"3"
458 else:
459 self.fail("Unknown base class")
460
461 if "f" in abilities:
462 obj.fileno()
463 else:
464 self.assertRaises(OSError, obj.fileno)
465
466 if readable:
467 obj.read(1)
468 obj.read()
469 else:
470 self.assertRaises(OSError, obj.read, 1)
471 self.assertRaises(OSError, obj.read)
472
473 if writable:
474 obj.write(data)
475 else:
476 self.assertRaises(OSError, obj.write, data)
477
Martin Panter3ee147f2016-03-31 21:05:31 +0000478 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000479 pipe_reader, pipe_writer):
480 # Pipes seem to appear as seekable on Windows
481 continue
482 seekable = "s" in abilities
483 self.assertEqual(obj.seekable(), seekable)
484
Martin Panter754aab22016-03-31 07:21:56 +0000485 if seekable:
486 obj.tell()
487 obj.seek(0)
488 else:
489 self.assertRaises(OSError, obj.tell)
490 self.assertRaises(OSError, obj.seek, 0)
491
492 if writable and seekable:
493 obj.truncate()
494 obj.truncate(0)
495 else:
496 self.assertRaises(OSError, obj.truncate)
497 self.assertRaises(OSError, obj.truncate, 0)
498
Antoine Pitrou13348842012-01-29 18:36:34 +0100499 def test_open_handles_NUL_chars(self):
500 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300501 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100502
503 bytes_fn = bytes(fn_with_NUL, 'ascii')
504 with warnings.catch_warnings():
505 warnings.simplefilter("ignore", DeprecationWarning)
506 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100507
Guido van Rossum28524c72007-02-27 05:47:44 +0000508 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000509 with self.open(support.TESTFN, "wb", buffering=0) as f:
510 self.assertEqual(f.readable(), False)
511 self.assertEqual(f.writable(), True)
512 self.assertEqual(f.seekable(), True)
513 self.write_ops(f)
514 with self.open(support.TESTFN, "rb", buffering=0) as f:
515 self.assertEqual(f.readable(), True)
516 self.assertEqual(f.writable(), False)
517 self.assertEqual(f.seekable(), True)
518 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000519
Guido van Rossum87429772007-04-10 21:06:59 +0000520 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000521 with self.open(support.TESTFN, "wb") as f:
522 self.assertEqual(f.readable(), False)
523 self.assertEqual(f.writable(), True)
524 self.assertEqual(f.seekable(), True)
525 self.write_ops(f)
526 with self.open(support.TESTFN, "rb") as f:
527 self.assertEqual(f.readable(), True)
528 self.assertEqual(f.writable(), False)
529 self.assertEqual(f.seekable(), True)
530 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000531
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000532 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000533 with self.open(support.TESTFN, "wb") as f:
534 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
535 with self.open(support.TESTFN, "rb") as f:
536 self.assertEqual(f.readline(), b"abc\n")
537 self.assertEqual(f.readline(10), b"def\n")
538 self.assertEqual(f.readline(2), b"xy")
539 self.assertEqual(f.readline(4), b"zzy\n")
540 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000541 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000542 self.assertRaises(TypeError, f.readline, 5.3)
543 with self.open(support.TESTFN, "r") as f:
544 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000545
Serhiy Storchakabf623ae2017-04-19 20:03:52 +0300546 def test_readline_nonsizeable(self):
547 # Issue #30061
548 # Crash when readline() returns an object without __len__
549 class R(self.IOBase):
550 def readline(self):
551 return None
552 self.assertRaises((TypeError, StopIteration), next, R())
553
554 def test_next_nonsizeable(self):
555 # Issue #30061
556 # Crash when __next__() returns an object without __len__
557 class R(self.IOBase):
558 def __next__(self):
559 return None
560 self.assertRaises(TypeError, R().readlines, 1)
561
Guido van Rossum28524c72007-02-27 05:47:44 +0000562 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000563 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000564 self.write_ops(f)
565 data = f.getvalue()
566 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000567 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000568 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000569
Guido van Rossum53807da2007-04-10 19:01:47 +0000570 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000571 # On Windows and Mac OSX this test comsumes large resources; It takes
572 # a long time to build the >2GB file and takes >2GB of disk space
573 # therefore the resource must be enabled to run this test.
574 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600575 support.requires(
576 'largefile',
577 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000578 with self.open(support.TESTFN, "w+b", 0) as f:
579 self.large_file_ops(f)
580 with self.open(support.TESTFN, "w+b") as f:
581 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000582
583 def test_with_open(self):
584 for bufsize in (0, 1, 100):
585 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000586 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000587 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000588 self.assertEqual(f.closed, True)
589 f = None
590 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000591 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000592 1/0
593 except ZeroDivisionError:
594 self.assertEqual(f.closed, True)
595 else:
596 self.fail("1/0 didn't raise an exception")
597
Antoine Pitrou08838b62009-01-21 00:55:13 +0000598 # issue 5008
599 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000600 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000601 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000602 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000603 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000604 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000605 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000606 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300607 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000608
Guido van Rossum87429772007-04-10 21:06:59 +0000609 def test_destructor(self):
610 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000611 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000612 def __del__(self):
613 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000614 try:
615 f = super().__del__
616 except AttributeError:
617 pass
618 else:
619 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000620 def close(self):
621 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000622 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000623 def flush(self):
624 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000625 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000626 with support.check_warnings(('', ResourceWarning)):
627 f = MyFileIO(support.TESTFN, "wb")
628 f.write(b"xxx")
629 del f
630 support.gc_collect()
631 self.assertEqual(record, [1, 2, 3])
632 with self.open(support.TESTFN, "rb") as f:
633 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000634
635 def _check_base_destructor(self, base):
636 record = []
637 class MyIO(base):
638 def __init__(self):
639 # This exercises the availability of attributes on object
640 # destruction.
641 # (in the C version, close() is called by the tp_dealloc
642 # function, not by __del__)
643 self.on_del = 1
644 self.on_close = 2
645 self.on_flush = 3
646 def __del__(self):
647 record.append(self.on_del)
648 try:
649 f = super().__del__
650 except AttributeError:
651 pass
652 else:
653 f()
654 def close(self):
655 record.append(self.on_close)
656 super().close()
657 def flush(self):
658 record.append(self.on_flush)
659 super().flush()
660 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000661 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000662 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000663 self.assertEqual(record, [1, 2, 3])
664
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000665 def test_IOBase_destructor(self):
666 self._check_base_destructor(self.IOBase)
667
668 def test_RawIOBase_destructor(self):
669 self._check_base_destructor(self.RawIOBase)
670
671 def test_BufferedIOBase_destructor(self):
672 self._check_base_destructor(self.BufferedIOBase)
673
674 def test_TextIOBase_destructor(self):
675 self._check_base_destructor(self.TextIOBase)
676
Guido van Rossum87429772007-04-10 21:06:59 +0000677 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000678 with self.open(support.TESTFN, "wb") as f:
679 f.write(b"xxx")
680 with self.open(support.TESTFN, "rb") as f:
681 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000682
Guido van Rossumd4103952007-04-12 05:44:49 +0000683 def test_array_writes(self):
684 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000685 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000686 def check(f):
687 with f:
688 self.assertEqual(f.write(a), n)
689 f.writelines((a,))
690 check(self.BytesIO())
691 check(self.FileIO(support.TESTFN, "w"))
692 check(self.BufferedWriter(self.MockRawIO()))
693 check(self.BufferedRandom(self.MockRawIO()))
694 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000695
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000696 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000697 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000698 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000699
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000700 def test_read_closed(self):
701 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000702 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000703 with self.open(support.TESTFN, "r") as f:
704 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000705 self.assertEqual(file.read(), "egg\n")
706 file.seek(0)
707 file.close()
708 self.assertRaises(ValueError, file.read)
709
710 def test_no_closefd_with_filename(self):
711 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000712 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000713
714 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000715 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000716 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000717 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000718 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000719 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000720 self.assertEqual(file.buffer.raw.closefd, False)
721
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000722 def test_garbage_collection(self):
723 # FileIO objects are collected, and collecting them flushes
724 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000725 with support.check_warnings(('', ResourceWarning)):
726 f = self.FileIO(support.TESTFN, "wb")
727 f.write(b"abcxxx")
728 f.f = f
729 wr = weakref.ref(f)
730 del f
731 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300732 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000733 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000734 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000735
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000736 def test_unbounded_file(self):
737 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
738 zero = "/dev/zero"
739 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000740 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000741 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000742 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000743 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000744 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000745 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000746 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000747 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000748 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000749 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000750 self.assertRaises(OverflowError, f.read)
751
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200752 def check_flush_error_on_close(self, *args, **kwargs):
753 # Test that the file is closed despite failed flush
754 # and that flush() is called before file closed.
755 f = self.open(*args, **kwargs)
756 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000757 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200758 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200759 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000760 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200761 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600762 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200763 self.assertTrue(closed) # flush() called
764 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200765 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200766
767 def test_flush_error_on_close(self):
768 # raw file
769 # Issue #5700: io.FileIO calls flush() after file closed
770 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
771 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
772 self.check_flush_error_on_close(fd, 'wb', buffering=0)
773 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
774 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
775 os.close(fd)
776 # buffered io
777 self.check_flush_error_on_close(support.TESTFN, 'wb')
778 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
779 self.check_flush_error_on_close(fd, 'wb')
780 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
781 self.check_flush_error_on_close(fd, 'wb', closefd=False)
782 os.close(fd)
783 # text io
784 self.check_flush_error_on_close(support.TESTFN, 'w')
785 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
786 self.check_flush_error_on_close(fd, 'w')
787 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
788 self.check_flush_error_on_close(fd, 'w', closefd=False)
789 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000790
791 def test_multi_close(self):
792 f = self.open(support.TESTFN, "wb", buffering=0)
793 f.close()
794 f.close()
795 f.close()
796 self.assertRaises(ValueError, f.flush)
797
Antoine Pitrou328ec742010-09-14 18:37:24 +0000798 def test_RawIOBase_read(self):
799 # Exercise the default RawIOBase.read() implementation (which calls
800 # readinto() internally).
801 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
802 self.assertEqual(rawio.read(2), b"ab")
803 self.assertEqual(rawio.read(2), b"c")
804 self.assertEqual(rawio.read(2), b"d")
805 self.assertEqual(rawio.read(2), None)
806 self.assertEqual(rawio.read(2), b"ef")
807 self.assertEqual(rawio.read(2), b"g")
808 self.assertEqual(rawio.read(2), None)
809 self.assertEqual(rawio.read(2), b"")
810
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400811 def test_types_have_dict(self):
812 test = (
813 self.IOBase(),
814 self.RawIOBase(),
815 self.TextIOBase(),
816 self.StringIO(),
817 self.BytesIO()
818 )
819 for obj in test:
820 self.assertTrue(hasattr(obj, "__dict__"))
821
Ross Lagerwall59142db2011-10-31 20:34:46 +0200822 def test_opener(self):
823 with self.open(support.TESTFN, "w") as f:
824 f.write("egg\n")
825 fd = os.open(support.TESTFN, os.O_RDONLY)
826 def opener(path, flags):
827 return fd
828 with self.open("non-existent", "r", opener=opener) as f:
829 self.assertEqual(f.read(), "egg\n")
830
Barry Warsaw480e2852016-06-08 17:47:26 -0400831 def test_bad_opener_negative_1(self):
832 # Issue #27066.
833 def badopener(fname, flags):
834 return -1
835 with self.assertRaises(ValueError) as cm:
836 open('non-existent', 'r', opener=badopener)
837 self.assertEqual(str(cm.exception), 'opener returned -1')
838
839 def test_bad_opener_other_negative(self):
840 # Issue #27066.
841 def badopener(fname, flags):
842 return -2
843 with self.assertRaises(ValueError) as cm:
844 open('non-existent', 'r', opener=badopener)
845 self.assertEqual(str(cm.exception), 'opener returned -2')
846
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200847 def test_fileio_closefd(self):
848 # Issue #4841
849 with self.open(__file__, 'rb') as f1, \
850 self.open(__file__, 'rb') as f2:
851 fileio = self.FileIO(f1.fileno(), closefd=False)
852 # .__init__() must not close f1
853 fileio.__init__(f2.fileno(), closefd=False)
854 f1.readline()
855 # .close() must not close f2
856 fileio.close()
857 f2.readline()
858
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300859 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200860 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300861 with self.assertRaises(ValueError):
862 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300863
864 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200865 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300866 with self.assertRaises(ValueError):
867 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300868
Martin Panter6bb91f32016-05-28 00:41:57 +0000869 def test_buffered_readinto_mixin(self):
870 # Test the implementation provided by BufferedIOBase
871 class Stream(self.BufferedIOBase):
872 def read(self, size):
873 return b"12345"
874 read1 = read
875 stream = Stream()
876 for method in ("readinto", "readinto1"):
877 with self.subTest(method):
878 buffer = byteslike(5)
879 self.assertEqual(getattr(stream, method)(buffer), 5)
880 self.assertEqual(bytes(buffer), b"12345")
881
Ethan Furmand62548a2016-06-04 14:38:43 -0700882 def test_fspath_support(self):
883 class PathLike:
884 def __init__(self, path):
885 self.path = path
886
887 def __fspath__(self):
888 return self.path
889
890 def check_path_succeeds(path):
891 with self.open(path, "w") as f:
892 f.write("egg\n")
893
894 with self.open(path, "r") as f:
895 self.assertEqual(f.read(), "egg\n")
896
897 check_path_succeeds(PathLike(support.TESTFN))
898 check_path_succeeds(PathLike(support.TESTFN.encode('utf-8')))
899
900 bad_path = PathLike(TypeError)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700901 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700902 self.open(bad_path, 'w')
903
904 # ensure that refcounting is correct with some error conditions
905 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
906 self.open(PathLike(support.TESTFN), 'rwxa')
907
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200908
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000909class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200910
911 def test_IOBase_finalize(self):
912 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
913 # class which inherits IOBase and an object of this class are caught
914 # in a reference cycle and close() is already in the method cache.
915 class MyIO(self.IOBase):
916 def close(self):
917 pass
918
919 # create an instance to populate the method cache
920 MyIO()
921 obj = MyIO()
922 obj.obj = obj
923 wr = weakref.ref(obj)
924 del MyIO
925 del obj
926 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300927 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000928
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000929class PyIOTest(IOTest):
930 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000931
Guido van Rossuma9e20242007-03-08 00:43:48 +0000932
Gregory P. Smith1bef9072015-04-14 13:24:34 -0700933@support.cpython_only
934class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -0700935
Gregory P. Smith054b0652015-04-14 12:58:05 -0700936 def test_RawIOBase_io_in_pyio_match(self):
937 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +0200938 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
939 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -0700940 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
941
942 def test_RawIOBase_pyio_in_io_match(self):
943 """Test that c RawIOBase class has all pyio RawIOBase methods"""
944 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
945 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
946
947
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000948class CommonBufferedTests:
949 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
950
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000951 def test_detach(self):
952 raw = self.MockRawIO()
953 buf = self.tp(raw)
954 self.assertIs(buf.detach(), raw)
955 self.assertRaises(ValueError, buf.detach)
956
Benjamin Peterson10e76b62014-12-21 20:51:50 -0600957 repr(buf) # Should still work
958
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000959 def test_fileno(self):
960 rawio = self.MockRawIO()
961 bufio = self.tp(rawio)
962
Ezio Melottib3aedd42010-11-20 19:04:17 +0000963 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000964
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000965 def test_invalid_args(self):
966 rawio = self.MockRawIO()
967 bufio = self.tp(rawio)
968 # Invalid whence
969 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200970 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000971
972 def test_override_destructor(self):
973 tp = self.tp
974 record = []
975 class MyBufferedIO(tp):
976 def __del__(self):
977 record.append(1)
978 try:
979 f = super().__del__
980 except AttributeError:
981 pass
982 else:
983 f()
984 def close(self):
985 record.append(2)
986 super().close()
987 def flush(self):
988 record.append(3)
989 super().flush()
990 rawio = self.MockRawIO()
991 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000992 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000993 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +0000994 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000995
996 def test_context_manager(self):
997 # Test usability as a context manager
998 rawio = self.MockRawIO()
999 bufio = self.tp(rawio)
1000 def _with():
1001 with bufio:
1002 pass
1003 _with()
1004 # bufio should now be closed, and using it a second time should raise
1005 # a ValueError.
1006 self.assertRaises(ValueError, _with)
1007
1008 def test_error_through_destructor(self):
1009 # Test that the exception state is not modified by a destructor,
1010 # even if close() fails.
1011 rawio = self.CloseFailureIO()
1012 def f():
1013 self.tp(rawio).xyzzy
1014 with support.captured_output("stderr") as s:
1015 self.assertRaises(AttributeError, f)
1016 s = s.getvalue().strip()
1017 if s:
1018 # The destructor *may* have printed an unraisable error, check it
1019 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001020 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001021 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +00001022
Antoine Pitrou716c4442009-05-23 19:04:03 +00001023 def test_repr(self):
1024 raw = self.MockRawIO()
1025 b = self.tp(raw)
Serhiy Storchaka521e5862014-07-22 15:00:37 +03001026 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001027 self.assertEqual(repr(b), "<%s>" % clsname)
1028 raw.name = "dummy"
1029 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
1030 raw.name = b"dummy"
1031 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
1032
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02001033 def test_recursive_repr(self):
1034 # Issue #25455
1035 raw = self.MockRawIO()
1036 b = self.tp(raw)
1037 with support.swap_attr(raw, 'name', b):
1038 try:
1039 repr(b) # Should not crash
1040 except RuntimeError:
1041 pass
1042
Antoine Pitrou6be88762010-05-03 16:48:20 +00001043 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001044 # Test that buffered file is closed despite failed flush
1045 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001046 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001047 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001048 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001049 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001050 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001051 raw.flush = bad_flush
1052 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001053 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001054 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001055 self.assertTrue(raw.closed)
1056 self.assertTrue(closed) # flush() called
1057 self.assertFalse(closed[0]) # flush() called before file closed
1058 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001059 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001060
1061 def test_close_error_on_close(self):
1062 raw = self.MockRawIO()
1063 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001064 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001065 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001066 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001067 raw.close = bad_close
1068 b = self.tp(raw)
1069 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001070 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001071 b.close()
1072 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001073 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001074 self.assertEqual(err.exception.__context__.args, ('flush',))
1075 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001076
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001077 def test_nonnormalized_close_error_on_close(self):
1078 # Issue #21677
1079 raw = self.MockRawIO()
1080 def bad_flush():
1081 raise non_existing_flush
1082 def bad_close():
1083 raise non_existing_close
1084 raw.close = bad_close
1085 b = self.tp(raw)
1086 b.flush = bad_flush
1087 with self.assertRaises(NameError) as err: # exception not swallowed
1088 b.close()
1089 self.assertIn('non_existing_close', str(err.exception))
1090 self.assertIsInstance(err.exception.__context__, NameError)
1091 self.assertIn('non_existing_flush', str(err.exception.__context__))
1092 self.assertFalse(b.closed)
1093
Antoine Pitrou6be88762010-05-03 16:48:20 +00001094 def test_multi_close(self):
1095 raw = self.MockRawIO()
1096 b = self.tp(raw)
1097 b.close()
1098 b.close()
1099 b.close()
1100 self.assertRaises(ValueError, b.flush)
1101
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001102 def test_unseekable(self):
1103 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1104 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1105 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1106
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001107 def test_readonly_attributes(self):
1108 raw = self.MockRawIO()
1109 buf = self.tp(raw)
1110 x = self.MockRawIO()
1111 with self.assertRaises(AttributeError):
1112 buf.raw = x
1113
Guido van Rossum78892e42007-04-06 17:31:18 +00001114
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001115class SizeofTest:
1116
1117 @support.cpython_only
1118 def test_sizeof(self):
1119 bufsize1 = 4096
1120 bufsize2 = 8192
1121 rawio = self.MockRawIO()
1122 bufio = self.tp(rawio, buffer_size=bufsize1)
1123 size = sys.getsizeof(bufio) - bufsize1
1124 rawio = self.MockRawIO()
1125 bufio = self.tp(rawio, buffer_size=bufsize2)
1126 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1127
Jesus Ceadc469452012-10-04 12:37:56 +02001128 @support.cpython_only
1129 def test_buffer_freeing(self) :
1130 bufsize = 4096
1131 rawio = self.MockRawIO()
1132 bufio = self.tp(rawio, buffer_size=bufsize)
1133 size = sys.getsizeof(bufio) - bufsize
1134 bufio.close()
1135 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001136
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001137class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1138 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001139
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001140 def test_constructor(self):
1141 rawio = self.MockRawIO([b"abc"])
1142 bufio = self.tp(rawio)
1143 bufio.__init__(rawio)
1144 bufio.__init__(rawio, buffer_size=1024)
1145 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001146 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001147 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1148 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1149 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1150 rawio = self.MockRawIO([b"abc"])
1151 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001152 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001153
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001154 def test_uninitialized(self):
1155 bufio = self.tp.__new__(self.tp)
1156 del bufio
1157 bufio = self.tp.__new__(self.tp)
1158 self.assertRaisesRegex((ValueError, AttributeError),
1159 'uninitialized|has no attribute',
1160 bufio.read, 0)
1161 bufio.__init__(self.MockRawIO())
1162 self.assertEqual(bufio.read(0), b'')
1163
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001164 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001165 for arg in (None, 7):
1166 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1167 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001168 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001169 # Invalid args
1170 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001171
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001172 def test_read1(self):
1173 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1174 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001175 self.assertEqual(b"a", bufio.read(1))
1176 self.assertEqual(b"b", bufio.read1(1))
1177 self.assertEqual(rawio._reads, 1)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001178 self.assertEqual(b"", bufio.read1(0))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001179 self.assertEqual(b"c", bufio.read1(100))
1180 self.assertEqual(rawio._reads, 1)
1181 self.assertEqual(b"d", bufio.read1(100))
1182 self.assertEqual(rawio._reads, 2)
1183 self.assertEqual(b"efg", bufio.read1(100))
1184 self.assertEqual(rawio._reads, 3)
1185 self.assertEqual(b"", bufio.read1(100))
1186 self.assertEqual(rawio._reads, 4)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001187
1188 def test_read1_arbitrary(self):
1189 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1190 bufio = self.tp(rawio)
1191 self.assertEqual(b"a", bufio.read(1))
1192 self.assertEqual(b"bc", bufio.read1())
1193 self.assertEqual(b"d", bufio.read1())
1194 self.assertEqual(b"efg", bufio.read1(-1))
1195 self.assertEqual(rawio._reads, 3)
1196 self.assertEqual(b"", bufio.read1())
1197 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001198
1199 def test_readinto(self):
1200 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1201 bufio = self.tp(rawio)
1202 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001203 self.assertEqual(bufio.readinto(b), 2)
1204 self.assertEqual(b, b"ab")
1205 self.assertEqual(bufio.readinto(b), 2)
1206 self.assertEqual(b, b"cd")
1207 self.assertEqual(bufio.readinto(b), 2)
1208 self.assertEqual(b, b"ef")
1209 self.assertEqual(bufio.readinto(b), 1)
1210 self.assertEqual(b, b"gf")
1211 self.assertEqual(bufio.readinto(b), 0)
1212 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001213 rawio = self.MockRawIO((b"abc", None))
1214 bufio = self.tp(rawio)
1215 self.assertEqual(bufio.readinto(b), 2)
1216 self.assertEqual(b, b"ab")
1217 self.assertEqual(bufio.readinto(b), 1)
1218 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001219
Benjamin Petersona96fea02014-06-22 14:17:44 -07001220 def test_readinto1(self):
1221 buffer_size = 10
1222 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1223 bufio = self.tp(rawio, buffer_size=buffer_size)
1224 b = bytearray(2)
1225 self.assertEqual(bufio.peek(3), b'abc')
1226 self.assertEqual(rawio._reads, 1)
1227 self.assertEqual(bufio.readinto1(b), 2)
1228 self.assertEqual(b, b"ab")
1229 self.assertEqual(rawio._reads, 1)
1230 self.assertEqual(bufio.readinto1(b), 1)
1231 self.assertEqual(b[:1], b"c")
1232 self.assertEqual(rawio._reads, 1)
1233 self.assertEqual(bufio.readinto1(b), 2)
1234 self.assertEqual(b, b"de")
1235 self.assertEqual(rawio._reads, 2)
1236 b = bytearray(2*buffer_size)
1237 self.assertEqual(bufio.peek(3), b'fgh')
1238 self.assertEqual(rawio._reads, 3)
1239 self.assertEqual(bufio.readinto1(b), 6)
1240 self.assertEqual(b[:6], b"fghjkl")
1241 self.assertEqual(rawio._reads, 4)
1242
1243 def test_readinto_array(self):
1244 buffer_size = 60
1245 data = b"a" * 26
1246 rawio = self.MockRawIO((data,))
1247 bufio = self.tp(rawio, buffer_size=buffer_size)
1248
1249 # Create an array with element size > 1 byte
1250 b = array.array('i', b'x' * 32)
1251 assert len(b) != 16
1252
1253 # Read into it. We should get as many *bytes* as we can fit into b
1254 # (which is more than the number of elements)
1255 n = bufio.readinto(b)
1256 self.assertGreater(n, len(b))
1257
1258 # Check that old contents of b are preserved
1259 bm = memoryview(b).cast('B')
1260 self.assertLess(n, len(bm))
1261 self.assertEqual(bm[:n], data[:n])
1262 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1263
1264 def test_readinto1_array(self):
1265 buffer_size = 60
1266 data = b"a" * 26
1267 rawio = self.MockRawIO((data,))
1268 bufio = self.tp(rawio, buffer_size=buffer_size)
1269
1270 # Create an array with element size > 1 byte
1271 b = array.array('i', b'x' * 32)
1272 assert len(b) != 16
1273
1274 # Read into it. We should get as many *bytes* as we can fit into b
1275 # (which is more than the number of elements)
1276 n = bufio.readinto1(b)
1277 self.assertGreater(n, len(b))
1278
1279 # Check that old contents of b are preserved
1280 bm = memoryview(b).cast('B')
1281 self.assertLess(n, len(bm))
1282 self.assertEqual(bm[:n], data[:n])
1283 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1284
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001285 def test_readlines(self):
1286 def bufio():
1287 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1288 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001289 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1290 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1291 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001292
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001293 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001294 data = b"abcdefghi"
1295 dlen = len(data)
1296
1297 tests = [
1298 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1299 [ 100, [ 3, 3, 3], [ dlen ] ],
1300 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1301 ]
1302
1303 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001304 rawio = self.MockFileIO(data)
1305 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001306 pos = 0
1307 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001308 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001309 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001310 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001311 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001312
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001313 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001314 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001315 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1316 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001317 self.assertEqual(b"abcd", bufio.read(6))
1318 self.assertEqual(b"e", bufio.read(1))
1319 self.assertEqual(b"fg", bufio.read())
1320 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001321 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001322 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001323
Victor Stinnera80987f2011-05-25 22:47:16 +02001324 rawio = self.MockRawIO((b"a", None, None))
1325 self.assertEqual(b"a", rawio.readall())
1326 self.assertIsNone(rawio.readall())
1327
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001328 def test_read_past_eof(self):
1329 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1330 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001331
Ezio Melottib3aedd42010-11-20 19:04:17 +00001332 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001333
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001334 def test_read_all(self):
1335 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1336 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001337
Ezio Melottib3aedd42010-11-20 19:04:17 +00001338 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001339
Victor Stinner45df8202010-04-28 22:31:17 +00001340 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001341 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001342 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001343 try:
1344 # Write out many bytes with exactly the same number of 0's,
1345 # 1's... 255's. This will help us check that concurrent reading
1346 # doesn't duplicate or forget contents.
1347 N = 1000
1348 l = list(range(256)) * N
1349 random.shuffle(l)
1350 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001351 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001352 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001353 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001354 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001355 errors = []
1356 results = []
1357 def f():
1358 try:
1359 # Intra-buffer read then buffer-flushing read
1360 for n in cycle([1, 19]):
1361 s = bufio.read(n)
1362 if not s:
1363 break
1364 # list.append() is atomic
1365 results.append(s)
1366 except Exception as e:
1367 errors.append(e)
1368 raise
1369 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001370 with support.start_threads(threads):
1371 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001372 self.assertFalse(errors,
1373 "the following exceptions were caught: %r" % errors)
1374 s = b''.join(results)
1375 for i in range(256):
1376 c = bytes(bytearray([i]))
1377 self.assertEqual(s.count(c), N)
1378 finally:
1379 support.unlink(support.TESTFN)
1380
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001381 def test_unseekable(self):
1382 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1383 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1384 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1385 bufio.read(1)
1386 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1387 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1388
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001389 def test_misbehaved_io(self):
1390 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1391 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001392 self.assertRaises(OSError, bufio.seek, 0)
1393 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001394
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001395 def test_no_extraneous_read(self):
1396 # Issue #9550; when the raw IO object has satisfied the read request,
1397 # we should not issue any additional reads, otherwise it may block
1398 # (e.g. socket).
1399 bufsize = 16
1400 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1401 rawio = self.MockRawIO([b"x" * n])
1402 bufio = self.tp(rawio, bufsize)
1403 self.assertEqual(bufio.read(n), b"x" * n)
1404 # Simple case: one raw read is enough to satisfy the request.
1405 self.assertEqual(rawio._extraneous_reads, 0,
1406 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1407 # A more complex case where two raw reads are needed to satisfy
1408 # the request.
1409 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1410 bufio = self.tp(rawio, bufsize)
1411 self.assertEqual(bufio.read(n), b"x" * n)
1412 self.assertEqual(rawio._extraneous_reads, 0,
1413 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1414
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001415 def test_read_on_closed(self):
1416 # Issue #23796
1417 b = io.BufferedReader(io.BytesIO(b"12"))
1418 b.read(1)
1419 b.close()
1420 self.assertRaises(ValueError, b.peek)
1421 self.assertRaises(ValueError, b.read1, 1)
1422
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001423
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001424class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001425 tp = io.BufferedReader
1426
1427 def test_constructor(self):
1428 BufferedReaderTest.test_constructor(self)
1429 # The allocation can succeed on 32-bit builds, e.g. with more
1430 # than 2GB RAM and a 64-bit kernel.
1431 if sys.maxsize > 0x7FFFFFFF:
1432 rawio = self.MockRawIO()
1433 bufio = self.tp(rawio)
1434 self.assertRaises((OverflowError, MemoryError, ValueError),
1435 bufio.__init__, rawio, sys.maxsize)
1436
1437 def test_initialization(self):
1438 rawio = self.MockRawIO([b"abc"])
1439 bufio = self.tp(rawio)
1440 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1441 self.assertRaises(ValueError, bufio.read)
1442 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1443 self.assertRaises(ValueError, bufio.read)
1444 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1445 self.assertRaises(ValueError, bufio.read)
1446
1447 def test_misbehaved_io_read(self):
1448 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1449 bufio = self.tp(rawio)
1450 # _pyio.BufferedReader seems to implement reading different, so that
1451 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001452 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001453
1454 def test_garbage_collection(self):
1455 # C BufferedReader objects are collected.
1456 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001457 with support.check_warnings(('', ResourceWarning)):
1458 rawio = self.FileIO(support.TESTFN, "w+b")
1459 f = self.tp(rawio)
1460 f.f = f
1461 wr = weakref.ref(f)
1462 del f
1463 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001464 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001465
R David Murray67bfe802013-02-23 21:51:05 -05001466 def test_args_error(self):
1467 # Issue #17275
1468 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1469 self.tp(io.BytesIO(), 1024, 1024, 1024)
1470
1471
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001472class PyBufferedReaderTest(BufferedReaderTest):
1473 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001474
Guido van Rossuma9e20242007-03-08 00:43:48 +00001475
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001476class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1477 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001478
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001479 def test_constructor(self):
1480 rawio = self.MockRawIO()
1481 bufio = self.tp(rawio)
1482 bufio.__init__(rawio)
1483 bufio.__init__(rawio, buffer_size=1024)
1484 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001485 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001486 bufio.flush()
1487 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1488 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1489 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1490 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001491 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001492 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001493 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001494
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001495 def test_uninitialized(self):
1496 bufio = self.tp.__new__(self.tp)
1497 del bufio
1498 bufio = self.tp.__new__(self.tp)
1499 self.assertRaisesRegex((ValueError, AttributeError),
1500 'uninitialized|has no attribute',
1501 bufio.write, b'')
1502 bufio.__init__(self.MockRawIO())
1503 self.assertEqual(bufio.write(b''), 0)
1504
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001505 def test_detach_flush(self):
1506 raw = self.MockRawIO()
1507 buf = self.tp(raw)
1508 buf.write(b"howdy!")
1509 self.assertFalse(raw._write_stack)
1510 buf.detach()
1511 self.assertEqual(raw._write_stack, [b"howdy!"])
1512
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001513 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001514 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001515 writer = self.MockRawIO()
1516 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001517 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001518 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001519 buffer = bytearray(b"def")
1520 bufio.write(buffer)
1521 buffer[:] = b"***" # Overwrite our copy of the data
1522 bufio.flush()
1523 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001524
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001525 def test_write_overflow(self):
1526 writer = self.MockRawIO()
1527 bufio = self.tp(writer, 8)
1528 contents = b"abcdefghijklmnop"
1529 for n in range(0, len(contents), 3):
1530 bufio.write(contents[n:n+3])
1531 flushed = b"".join(writer._write_stack)
1532 # At least (total - 8) bytes were implicitly flushed, perhaps more
1533 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001534 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001535
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001536 def check_writes(self, intermediate_func):
1537 # Lots of writes, test the flushed output is as expected.
1538 contents = bytes(range(256)) * 1000
1539 n = 0
1540 writer = self.MockRawIO()
1541 bufio = self.tp(writer, 13)
1542 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1543 def gen_sizes():
1544 for size in count(1):
1545 for i in range(15):
1546 yield size
1547 sizes = gen_sizes()
1548 while n < len(contents):
1549 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001550 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001551 intermediate_func(bufio)
1552 n += size
1553 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001554 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001555
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001556 def test_writes(self):
1557 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001558
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001559 def test_writes_and_flushes(self):
1560 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001561
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001562 def test_writes_and_seeks(self):
1563 def _seekabs(bufio):
1564 pos = bufio.tell()
1565 bufio.seek(pos + 1, 0)
1566 bufio.seek(pos - 1, 0)
1567 bufio.seek(pos, 0)
1568 self.check_writes(_seekabs)
1569 def _seekrel(bufio):
1570 pos = bufio.seek(0, 1)
1571 bufio.seek(+1, 1)
1572 bufio.seek(-1, 1)
1573 bufio.seek(pos, 0)
1574 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001575
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001576 def test_writes_and_truncates(self):
1577 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001578
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001579 def test_write_non_blocking(self):
1580 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001581 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001582
Ezio Melottib3aedd42010-11-20 19:04:17 +00001583 self.assertEqual(bufio.write(b"abcd"), 4)
1584 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001585 # 1 byte will be written, the rest will be buffered
1586 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001587 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001588
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001589 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1590 raw.block_on(b"0")
1591 try:
1592 bufio.write(b"opqrwxyz0123456789")
1593 except self.BlockingIOError as e:
1594 written = e.characters_written
1595 else:
1596 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001597 self.assertEqual(written, 16)
1598 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001599 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001600
Ezio Melottib3aedd42010-11-20 19:04:17 +00001601 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001602 s = raw.pop_written()
1603 # Previously buffered bytes were flushed
1604 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001605
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001606 def test_write_and_rewind(self):
1607 raw = io.BytesIO()
1608 bufio = self.tp(raw, 4)
1609 self.assertEqual(bufio.write(b"abcdef"), 6)
1610 self.assertEqual(bufio.tell(), 6)
1611 bufio.seek(0, 0)
1612 self.assertEqual(bufio.write(b"XY"), 2)
1613 bufio.seek(6, 0)
1614 self.assertEqual(raw.getvalue(), b"XYcdef")
1615 self.assertEqual(bufio.write(b"123456"), 6)
1616 bufio.flush()
1617 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001618
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001619 def test_flush(self):
1620 writer = self.MockRawIO()
1621 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001622 bufio.write(b"abc")
1623 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001624 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001625
Antoine Pitrou131a4892012-10-16 22:57:11 +02001626 def test_writelines(self):
1627 l = [b'ab', b'cd', b'ef']
1628 writer = self.MockRawIO()
1629 bufio = self.tp(writer, 8)
1630 bufio.writelines(l)
1631 bufio.flush()
1632 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1633
1634 def test_writelines_userlist(self):
1635 l = UserList([b'ab', b'cd', b'ef'])
1636 writer = self.MockRawIO()
1637 bufio = self.tp(writer, 8)
1638 bufio.writelines(l)
1639 bufio.flush()
1640 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1641
1642 def test_writelines_error(self):
1643 writer = self.MockRawIO()
1644 bufio = self.tp(writer, 8)
1645 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1646 self.assertRaises(TypeError, bufio.writelines, None)
1647 self.assertRaises(TypeError, bufio.writelines, 'abc')
1648
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001649 def test_destructor(self):
1650 writer = self.MockRawIO()
1651 bufio = self.tp(writer, 8)
1652 bufio.write(b"abc")
1653 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001654 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001655 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001656
1657 def test_truncate(self):
1658 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001659 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001660 bufio = self.tp(raw, 8)
1661 bufio.write(b"abcdef")
1662 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001663 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001664 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001665 self.assertEqual(f.read(), b"abc")
1666
Victor Stinner45df8202010-04-28 22:31:17 +00001667 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001668 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001669 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001670 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001671 # Write out many bytes from many threads and test they were
1672 # all flushed.
1673 N = 1000
1674 contents = bytes(range(256)) * N
1675 sizes = cycle([1, 19])
1676 n = 0
1677 queue = deque()
1678 while n < len(contents):
1679 size = next(sizes)
1680 queue.append(contents[n:n+size])
1681 n += size
1682 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001683 # We use a real file object because it allows us to
1684 # exercise situations where the GIL is released before
1685 # writing the buffer to the raw streams. This is in addition
1686 # to concurrency issues due to switching threads in the middle
1687 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001688 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001689 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001690 errors = []
1691 def f():
1692 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001693 while True:
1694 try:
1695 s = queue.popleft()
1696 except IndexError:
1697 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001698 bufio.write(s)
1699 except Exception as e:
1700 errors.append(e)
1701 raise
1702 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001703 with support.start_threads(threads):
1704 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001705 self.assertFalse(errors,
1706 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001707 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001708 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001709 s = f.read()
1710 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001711 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001712 finally:
1713 support.unlink(support.TESTFN)
1714
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001715 def test_misbehaved_io(self):
1716 rawio = self.MisbehavedRawIO()
1717 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001718 self.assertRaises(OSError, bufio.seek, 0)
1719 self.assertRaises(OSError, bufio.tell)
1720 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001721
Florent Xicluna109d5732012-07-07 17:03:22 +02001722 def test_max_buffer_size_removal(self):
1723 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001724 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001725
Benjamin Peterson68623612012-12-20 11:53:11 -06001726 def test_write_error_on_close(self):
1727 raw = self.MockRawIO()
1728 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001729 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001730 raw.write = bad_write
1731 b = self.tp(raw)
1732 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001733 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001734 self.assertTrue(b.closed)
1735
Benjamin Peterson59406a92009-03-26 17:10:29 +00001736
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001737class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001738 tp = io.BufferedWriter
1739
1740 def test_constructor(self):
1741 BufferedWriterTest.test_constructor(self)
1742 # The allocation can succeed on 32-bit builds, e.g. with more
1743 # than 2GB RAM and a 64-bit kernel.
1744 if sys.maxsize > 0x7FFFFFFF:
1745 rawio = self.MockRawIO()
1746 bufio = self.tp(rawio)
1747 self.assertRaises((OverflowError, MemoryError, ValueError),
1748 bufio.__init__, rawio, sys.maxsize)
1749
1750 def test_initialization(self):
1751 rawio = self.MockRawIO()
1752 bufio = self.tp(rawio)
1753 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1754 self.assertRaises(ValueError, bufio.write, b"def")
1755 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1756 self.assertRaises(ValueError, bufio.write, b"def")
1757 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1758 self.assertRaises(ValueError, bufio.write, b"def")
1759
1760 def test_garbage_collection(self):
1761 # C BufferedWriter objects are collected, and collecting them flushes
1762 # all data to disk.
1763 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001764 with support.check_warnings(('', ResourceWarning)):
1765 rawio = self.FileIO(support.TESTFN, "w+b")
1766 f = self.tp(rawio)
1767 f.write(b"123xxx")
1768 f.x = f
1769 wr = weakref.ref(f)
1770 del f
1771 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001772 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001773 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001774 self.assertEqual(f.read(), b"123xxx")
1775
R David Murray67bfe802013-02-23 21:51:05 -05001776 def test_args_error(self):
1777 # Issue #17275
1778 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1779 self.tp(io.BytesIO(), 1024, 1024, 1024)
1780
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001781
1782class PyBufferedWriterTest(BufferedWriterTest):
1783 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001784
Guido van Rossum01a27522007-03-07 01:00:12 +00001785class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001786
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001787 def test_constructor(self):
1788 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001789 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001790
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001791 def test_uninitialized(self):
1792 pair = self.tp.__new__(self.tp)
1793 del pair
1794 pair = self.tp.__new__(self.tp)
1795 self.assertRaisesRegex((ValueError, AttributeError),
1796 'uninitialized|has no attribute',
1797 pair.read, 0)
1798 self.assertRaisesRegex((ValueError, AttributeError),
1799 'uninitialized|has no attribute',
1800 pair.write, b'')
1801 pair.__init__(self.MockRawIO(), self.MockRawIO())
1802 self.assertEqual(pair.read(0), b'')
1803 self.assertEqual(pair.write(b''), 0)
1804
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001805 def test_detach(self):
1806 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1807 self.assertRaises(self.UnsupportedOperation, pair.detach)
1808
Florent Xicluna109d5732012-07-07 17:03:22 +02001809 def test_constructor_max_buffer_size_removal(self):
1810 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001811 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001812
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001813 def test_constructor_with_not_readable(self):
1814 class NotReadable(MockRawIO):
1815 def readable(self):
1816 return False
1817
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001818 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001819
1820 def test_constructor_with_not_writeable(self):
1821 class NotWriteable(MockRawIO):
1822 def writable(self):
1823 return False
1824
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001825 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001826
1827 def test_read(self):
1828 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1829
1830 self.assertEqual(pair.read(3), b"abc")
1831 self.assertEqual(pair.read(1), b"d")
1832 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001833 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1834 self.assertEqual(pair.read(None), b"abc")
1835
1836 def test_readlines(self):
1837 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1838 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1839 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1840 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001841
1842 def test_read1(self):
1843 # .read1() is delegated to the underlying reader object, so this test
1844 # can be shallow.
1845 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1846
1847 self.assertEqual(pair.read1(3), b"abc")
Martin Panterccb2c0e2016-10-20 23:48:14 +00001848 self.assertEqual(pair.read1(), b"def")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001849
1850 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00001851 for method in ("readinto", "readinto1"):
1852 with self.subTest(method):
1853 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001854
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03001855 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00001856 self.assertEqual(getattr(pair, method)(data), 5)
1857 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001858
1859 def test_write(self):
1860 w = self.MockRawIO()
1861 pair = self.tp(self.MockRawIO(), w)
1862
1863 pair.write(b"abc")
1864 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00001865 buffer = bytearray(b"def")
1866 pair.write(buffer)
1867 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001868 pair.flush()
1869 self.assertEqual(w._write_stack, [b"abc", b"def"])
1870
1871 def test_peek(self):
1872 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1873
1874 self.assertTrue(pair.peek(3).startswith(b"abc"))
1875 self.assertEqual(pair.read(3), b"abc")
1876
1877 def test_readable(self):
1878 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1879 self.assertTrue(pair.readable())
1880
1881 def test_writeable(self):
1882 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1883 self.assertTrue(pair.writable())
1884
1885 def test_seekable(self):
1886 # BufferedRWPairs are never seekable, even if their readers and writers
1887 # are.
1888 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1889 self.assertFalse(pair.seekable())
1890
1891 # .flush() is delegated to the underlying writer object and has been
1892 # tested in the test_write method.
1893
1894 def test_close_and_closed(self):
1895 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1896 self.assertFalse(pair.closed)
1897 pair.close()
1898 self.assertTrue(pair.closed)
1899
Serhiy Storchaka7665be62015-03-24 23:21:57 +02001900 def test_reader_close_error_on_close(self):
1901 def reader_close():
1902 reader_non_existing
1903 reader = self.MockRawIO()
1904 reader.close = reader_close
1905 writer = self.MockRawIO()
1906 pair = self.tp(reader, writer)
1907 with self.assertRaises(NameError) as err:
1908 pair.close()
1909 self.assertIn('reader_non_existing', str(err.exception))
1910 self.assertTrue(pair.closed)
1911 self.assertFalse(reader.closed)
1912 self.assertTrue(writer.closed)
1913
1914 def test_writer_close_error_on_close(self):
1915 def writer_close():
1916 writer_non_existing
1917 reader = self.MockRawIO()
1918 writer = self.MockRawIO()
1919 writer.close = writer_close
1920 pair = self.tp(reader, writer)
1921 with self.assertRaises(NameError) as err:
1922 pair.close()
1923 self.assertIn('writer_non_existing', str(err.exception))
1924 self.assertFalse(pair.closed)
1925 self.assertTrue(reader.closed)
1926 self.assertFalse(writer.closed)
1927
1928 def test_reader_writer_close_error_on_close(self):
1929 def reader_close():
1930 reader_non_existing
1931 def writer_close():
1932 writer_non_existing
1933 reader = self.MockRawIO()
1934 reader.close = reader_close
1935 writer = self.MockRawIO()
1936 writer.close = writer_close
1937 pair = self.tp(reader, writer)
1938 with self.assertRaises(NameError) as err:
1939 pair.close()
1940 self.assertIn('reader_non_existing', str(err.exception))
1941 self.assertIsInstance(err.exception.__context__, NameError)
1942 self.assertIn('writer_non_existing', str(err.exception.__context__))
1943 self.assertFalse(pair.closed)
1944 self.assertFalse(reader.closed)
1945 self.assertFalse(writer.closed)
1946
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001947 def test_isatty(self):
1948 class SelectableIsAtty(MockRawIO):
1949 def __init__(self, isatty):
1950 MockRawIO.__init__(self)
1951 self._isatty = isatty
1952
1953 def isatty(self):
1954 return self._isatty
1955
1956 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1957 self.assertFalse(pair.isatty())
1958
1959 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1960 self.assertTrue(pair.isatty())
1961
1962 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1963 self.assertTrue(pair.isatty())
1964
1965 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1966 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001967
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04001968 def test_weakref_clearing(self):
1969 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1970 ref = weakref.ref(brw)
1971 brw = None
1972 ref = None # Shouldn't segfault.
1973
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001974class CBufferedRWPairTest(BufferedRWPairTest):
1975 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001976
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001977class PyBufferedRWPairTest(BufferedRWPairTest):
1978 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001979
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001980
1981class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1982 read_mode = "rb+"
1983 write_mode = "wb+"
1984
1985 def test_constructor(self):
1986 BufferedReaderTest.test_constructor(self)
1987 BufferedWriterTest.test_constructor(self)
1988
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001989 def test_uninitialized(self):
1990 BufferedReaderTest.test_uninitialized(self)
1991 BufferedWriterTest.test_uninitialized(self)
1992
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001993 def test_read_and_write(self):
1994 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001995 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001996
1997 self.assertEqual(b"as", rw.read(2))
1998 rw.write(b"ddd")
1999 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002000 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002001 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002002 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00002003
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002004 def test_seek_and_tell(self):
2005 raw = self.BytesIO(b"asdfghjkl")
2006 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00002007
Ezio Melottib3aedd42010-11-20 19:04:17 +00002008 self.assertEqual(b"as", rw.read(2))
2009 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002010 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002011 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00002012
Antoine Pitroue05565e2011-08-20 14:39:23 +02002013 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00002014 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02002015 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002016 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002017 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002018 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002019 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002020 self.assertEqual(7, rw.tell())
2021 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02002022 rw.flush()
2023 self.assertEqual(b"asdf123fl", raw.getvalue())
2024
Christian Heimes8e42a0a2007-11-08 18:04:45 +00002025 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002026
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002027 def check_flush_and_read(self, read_func):
2028 raw = self.BytesIO(b"abcdefghi")
2029 bufio = self.tp(raw)
2030
Ezio Melottib3aedd42010-11-20 19:04:17 +00002031 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002032 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002033 self.assertEqual(b"ef", read_func(bufio, 2))
2034 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002035 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002036 self.assertEqual(6, bufio.tell())
2037 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002038 raw.seek(0, 0)
2039 raw.write(b"XYZ")
2040 # flush() resets the read buffer
2041 bufio.flush()
2042 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002043 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002044
2045 def test_flush_and_read(self):
2046 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2047
2048 def test_flush_and_readinto(self):
2049 def _readinto(bufio, n=-1):
2050 b = bytearray(n if n >= 0 else 9999)
2051 n = bufio.readinto(b)
2052 return bytes(b[:n])
2053 self.check_flush_and_read(_readinto)
2054
2055 def test_flush_and_peek(self):
2056 def _peek(bufio, n=-1):
2057 # This relies on the fact that the buffer can contain the whole
2058 # raw stream, otherwise peek() can return less.
2059 b = bufio.peek(n)
2060 if n != -1:
2061 b = b[:n]
2062 bufio.seek(len(b), 1)
2063 return b
2064 self.check_flush_and_read(_peek)
2065
2066 def test_flush_and_write(self):
2067 raw = self.BytesIO(b"abcdefghi")
2068 bufio = self.tp(raw)
2069
2070 bufio.write(b"123")
2071 bufio.flush()
2072 bufio.write(b"45")
2073 bufio.flush()
2074 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002075 self.assertEqual(b"12345fghi", raw.getvalue())
2076 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002077
2078 def test_threads(self):
2079 BufferedReaderTest.test_threads(self)
2080 BufferedWriterTest.test_threads(self)
2081
2082 def test_writes_and_peek(self):
2083 def _peek(bufio):
2084 bufio.peek(1)
2085 self.check_writes(_peek)
2086 def _peek(bufio):
2087 pos = bufio.tell()
2088 bufio.seek(-1, 1)
2089 bufio.peek(1)
2090 bufio.seek(pos, 0)
2091 self.check_writes(_peek)
2092
2093 def test_writes_and_reads(self):
2094 def _read(bufio):
2095 bufio.seek(-1, 1)
2096 bufio.read(1)
2097 self.check_writes(_read)
2098
2099 def test_writes_and_read1s(self):
2100 def _read1(bufio):
2101 bufio.seek(-1, 1)
2102 bufio.read1(1)
2103 self.check_writes(_read1)
2104
2105 def test_writes_and_readintos(self):
2106 def _read(bufio):
2107 bufio.seek(-1, 1)
2108 bufio.readinto(bytearray(1))
2109 self.check_writes(_read)
2110
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002111 def test_write_after_readahead(self):
2112 # Issue #6629: writing after the buffer was filled by readahead should
2113 # first rewind the raw stream.
2114 for overwrite_size in [1, 5]:
2115 raw = self.BytesIO(b"A" * 10)
2116 bufio = self.tp(raw, 4)
2117 # Trigger readahead
2118 self.assertEqual(bufio.read(1), b"A")
2119 self.assertEqual(bufio.tell(), 1)
2120 # Overwriting should rewind the raw stream if it needs so
2121 bufio.write(b"B" * overwrite_size)
2122 self.assertEqual(bufio.tell(), overwrite_size + 1)
2123 # If the write size was smaller than the buffer size, flush() and
2124 # check that rewind happens.
2125 bufio.flush()
2126 self.assertEqual(bufio.tell(), overwrite_size + 1)
2127 s = raw.getvalue()
2128 self.assertEqual(s,
2129 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2130
Antoine Pitrou7c404892011-05-13 00:13:33 +02002131 def test_write_rewind_write(self):
2132 # Various combinations of reading / writing / seeking backwards / writing again
2133 def mutate(bufio, pos1, pos2):
2134 assert pos2 >= pos1
2135 # Fill the buffer
2136 bufio.seek(pos1)
2137 bufio.read(pos2 - pos1)
2138 bufio.write(b'\x02')
2139 # This writes earlier than the previous write, but still inside
2140 # the buffer.
2141 bufio.seek(pos1)
2142 bufio.write(b'\x01')
2143
2144 b = b"\x80\x81\x82\x83\x84"
2145 for i in range(0, len(b)):
2146 for j in range(i, len(b)):
2147 raw = self.BytesIO(b)
2148 bufio = self.tp(raw, 100)
2149 mutate(bufio, i, j)
2150 bufio.flush()
2151 expected = bytearray(b)
2152 expected[j] = 2
2153 expected[i] = 1
2154 self.assertEqual(raw.getvalue(), expected,
2155 "failed result for i=%d, j=%d" % (i, j))
2156
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002157 def test_truncate_after_read_or_write(self):
2158 raw = self.BytesIO(b"A" * 10)
2159 bufio = self.tp(raw, 100)
2160 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2161 self.assertEqual(bufio.truncate(), 2)
2162 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2163 self.assertEqual(bufio.truncate(), 4)
2164
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002165 def test_misbehaved_io(self):
2166 BufferedReaderTest.test_misbehaved_io(self)
2167 BufferedWriterTest.test_misbehaved_io(self)
2168
Antoine Pitroue05565e2011-08-20 14:39:23 +02002169 def test_interleaved_read_write(self):
2170 # Test for issue #12213
2171 with self.BytesIO(b'abcdefgh') as raw:
2172 with self.tp(raw, 100) as f:
2173 f.write(b"1")
2174 self.assertEqual(f.read(1), b'b')
2175 f.write(b'2')
2176 self.assertEqual(f.read1(1), b'd')
2177 f.write(b'3')
2178 buf = bytearray(1)
2179 f.readinto(buf)
2180 self.assertEqual(buf, b'f')
2181 f.write(b'4')
2182 self.assertEqual(f.peek(1), b'h')
2183 f.flush()
2184 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2185
2186 with self.BytesIO(b'abc') as raw:
2187 with self.tp(raw, 100) as f:
2188 self.assertEqual(f.read(1), b'a')
2189 f.write(b"2")
2190 self.assertEqual(f.read(1), b'c')
2191 f.flush()
2192 self.assertEqual(raw.getvalue(), b'a2c')
2193
2194 def test_interleaved_readline_write(self):
2195 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2196 with self.tp(raw) as f:
2197 f.write(b'1')
2198 self.assertEqual(f.readline(), b'b\n')
2199 f.write(b'2')
2200 self.assertEqual(f.readline(), b'def\n')
2201 f.write(b'3')
2202 self.assertEqual(f.readline(), b'\n')
2203 f.flush()
2204 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2205
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002206 # You can't construct a BufferedRandom over a non-seekable stream.
2207 test_unseekable = None
2208
R David Murray67bfe802013-02-23 21:51:05 -05002209
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002210class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002211 tp = io.BufferedRandom
2212
2213 def test_constructor(self):
2214 BufferedRandomTest.test_constructor(self)
2215 # The allocation can succeed on 32-bit builds, e.g. with more
2216 # than 2GB RAM and a 64-bit kernel.
2217 if sys.maxsize > 0x7FFFFFFF:
2218 rawio = self.MockRawIO()
2219 bufio = self.tp(rawio)
2220 self.assertRaises((OverflowError, MemoryError, ValueError),
2221 bufio.__init__, rawio, sys.maxsize)
2222
2223 def test_garbage_collection(self):
2224 CBufferedReaderTest.test_garbage_collection(self)
2225 CBufferedWriterTest.test_garbage_collection(self)
2226
R David Murray67bfe802013-02-23 21:51:05 -05002227 def test_args_error(self):
2228 # Issue #17275
2229 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2230 self.tp(io.BytesIO(), 1024, 1024, 1024)
2231
2232
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002233class PyBufferedRandomTest(BufferedRandomTest):
2234 tp = pyio.BufferedRandom
2235
2236
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002237# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2238# properties:
2239# - A single output character can correspond to many bytes of input.
2240# - The number of input bytes to complete the character can be
2241# undetermined until the last input byte is received.
2242# - The number of input bytes can vary depending on previous input.
2243# - A single input byte can correspond to many characters of output.
2244# - The number of output characters can be undetermined until the
2245# last input byte is received.
2246# - The number of output characters can vary depending on previous input.
2247
2248class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2249 """
2250 For testing seek/tell behavior with a stateful, buffering decoder.
2251
2252 Input is a sequence of words. Words may be fixed-length (length set
2253 by input) or variable-length (period-terminated). In variable-length
2254 mode, extra periods are ignored. Possible words are:
2255 - 'i' followed by a number sets the input length, I (maximum 99).
2256 When I is set to 0, words are space-terminated.
2257 - 'o' followed by a number sets the output length, O (maximum 99).
2258 - Any other word is converted into a word followed by a period on
2259 the output. The output word consists of the input word truncated
2260 or padded out with hyphens to make its length equal to O. If O
2261 is 0, the word is output verbatim without truncating or padding.
2262 I and O are initially set to 1. When I changes, any buffered input is
2263 re-scanned according to the new I. EOF also terminates the last word.
2264 """
2265
2266 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002267 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002268 self.reset()
2269
2270 def __repr__(self):
2271 return '<SID %x>' % id(self)
2272
2273 def reset(self):
2274 self.i = 1
2275 self.o = 1
2276 self.buffer = bytearray()
2277
2278 def getstate(self):
2279 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2280 return bytes(self.buffer), i*100 + o
2281
2282 def setstate(self, state):
2283 buffer, io = state
2284 self.buffer = bytearray(buffer)
2285 i, o = divmod(io, 100)
2286 self.i, self.o = i ^ 1, o ^ 1
2287
2288 def decode(self, input, final=False):
2289 output = ''
2290 for b in input:
2291 if self.i == 0: # variable-length, terminated with period
2292 if b == ord('.'):
2293 if self.buffer:
2294 output += self.process_word()
2295 else:
2296 self.buffer.append(b)
2297 else: # fixed-length, terminate after self.i bytes
2298 self.buffer.append(b)
2299 if len(self.buffer) == self.i:
2300 output += self.process_word()
2301 if final and self.buffer: # EOF terminates the last word
2302 output += self.process_word()
2303 return output
2304
2305 def process_word(self):
2306 output = ''
2307 if self.buffer[0] == ord('i'):
2308 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2309 elif self.buffer[0] == ord('o'):
2310 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2311 else:
2312 output = self.buffer.decode('ascii')
2313 if len(output) < self.o:
2314 output += '-'*self.o # pad out with hyphens
2315 if self.o:
2316 output = output[:self.o] # truncate to output length
2317 output += '.'
2318 self.buffer = bytearray()
2319 return output
2320
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002321 codecEnabled = False
2322
2323 @classmethod
2324 def lookupTestDecoder(cls, name):
2325 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002326 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002327 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002328 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002329 incrementalencoder=None,
2330 streamreader=None, streamwriter=None,
2331 incrementaldecoder=cls)
2332
2333# Register the previous decoder for testing.
2334# Disabled by default, tests will enable it.
2335codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2336
2337
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002338class StatefulIncrementalDecoderTest(unittest.TestCase):
2339 """
2340 Make sure the StatefulIncrementalDecoder actually works.
2341 """
2342
2343 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002344 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002345 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002346 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002347 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002348 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002349 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002350 # I=0, O=6 (variable-length input, fixed-length output)
2351 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2352 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002353 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002354 # I=6, O=3 (fixed-length input > fixed-length output)
2355 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2356 # I=0, then 3; O=29, then 15 (with longer output)
2357 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2358 'a----------------------------.' +
2359 'b----------------------------.' +
2360 'cde--------------------------.' +
2361 'abcdefghijabcde.' +
2362 'a.b------------.' +
2363 '.c.------------.' +
2364 'd.e------------.' +
2365 'k--------------.' +
2366 'l--------------.' +
2367 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002368 ]
2369
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002370 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002371 # Try a few one-shot test cases.
2372 for input, eof, output in self.test_cases:
2373 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002374 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002375
2376 # Also test an unfinished decode, followed by forcing EOF.
2377 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002378 self.assertEqual(d.decode(b'oiabcd'), '')
2379 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002380
2381class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002382
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002383 def setUp(self):
2384 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2385 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002386 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002387
Guido van Rossumd0712812007-04-11 16:32:43 +00002388 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002389 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002390
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002391 def test_constructor(self):
2392 r = self.BytesIO(b"\xc3\xa9\n\n")
2393 b = self.BufferedReader(r, 1000)
2394 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002395 t.__init__(b, encoding="latin-1", newline="\r\n")
2396 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002397 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002398 t.__init__(b, encoding="utf-8", line_buffering=True)
2399 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002400 self.assertEqual(t.line_buffering, True)
2401 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002402 self.assertRaises(TypeError, t.__init__, b, newline=42)
2403 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2404
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002405 def test_uninitialized(self):
2406 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2407 del t
2408 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2409 self.assertRaises(Exception, repr, t)
2410 self.assertRaisesRegex((ValueError, AttributeError),
2411 'uninitialized|has no attribute',
2412 t.read, 0)
2413 t.__init__(self.MockRawIO())
2414 self.assertEqual(t.read(0), '')
2415
Nick Coghlana9b15242014-02-04 22:11:18 +10002416 def test_non_text_encoding_codecs_are_rejected(self):
2417 # Ensure the constructor complains if passed a codec that isn't
2418 # marked as a text encoding
2419 # http://bugs.python.org/issue20404
2420 r = self.BytesIO()
2421 b = self.BufferedWriter(r)
2422 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2423 self.TextIOWrapper(b, encoding="hex")
2424
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002425 def test_detach(self):
2426 r = self.BytesIO()
2427 b = self.BufferedWriter(r)
2428 t = self.TextIOWrapper(b)
2429 self.assertIs(t.detach(), b)
2430
2431 t = self.TextIOWrapper(b, encoding="ascii")
2432 t.write("howdy")
2433 self.assertFalse(r.getvalue())
2434 t.detach()
2435 self.assertEqual(r.getvalue(), b"howdy")
2436 self.assertRaises(ValueError, t.detach)
2437
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002438 # Operations independent of the detached stream should still work
2439 repr(t)
2440 self.assertEqual(t.encoding, "ascii")
2441 self.assertEqual(t.errors, "strict")
2442 self.assertFalse(t.line_buffering)
2443
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002444 def test_repr(self):
2445 raw = self.BytesIO("hello".encode("utf-8"))
2446 b = self.BufferedReader(raw)
2447 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002448 modname = self.TextIOWrapper.__module__
2449 self.assertEqual(repr(t),
2450 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2451 raw.name = "dummy"
2452 self.assertEqual(repr(t),
2453 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002454 t.mode = "r"
2455 self.assertEqual(repr(t),
2456 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002457 raw.name = b"dummy"
2458 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002459 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002460
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002461 t.buffer.detach()
2462 repr(t) # Should not raise an exception
2463
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002464 def test_recursive_repr(self):
2465 # Issue #25455
2466 raw = self.BytesIO()
2467 t = self.TextIOWrapper(raw)
2468 with support.swap_attr(raw, 'name', t):
2469 try:
2470 repr(t) # Should not crash
2471 except RuntimeError:
2472 pass
2473
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002474 def test_line_buffering(self):
2475 r = self.BytesIO()
2476 b = self.BufferedWriter(r, 1000)
2477 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002478 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002479 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002480 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002481 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002482 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002483 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002484
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002485 def test_default_encoding(self):
2486 old_environ = dict(os.environ)
2487 try:
2488 # try to get a user preferred encoding different than the current
2489 # locale encoding to check that TextIOWrapper() uses the current
2490 # locale encoding and not the user preferred encoding
2491 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2492 if key in os.environ:
2493 del os.environ[key]
2494
2495 current_locale_encoding = locale.getpreferredencoding(False)
2496 b = self.BytesIO()
2497 t = self.TextIOWrapper(b)
2498 self.assertEqual(t.encoding, current_locale_encoding)
2499 finally:
2500 os.environ.clear()
2501 os.environ.update(old_environ)
2502
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002503 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002504 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002505 # Issue 15989
2506 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002507 b = self.BytesIO()
2508 b.fileno = lambda: _testcapi.INT_MAX + 1
2509 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2510 b.fileno = lambda: _testcapi.UINT_MAX + 1
2511 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2512
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002513 def test_encoding(self):
2514 # Check the encoding attribute is always set, and valid
2515 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002516 t = self.TextIOWrapper(b, encoding="utf-8")
2517 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002518 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002519 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002520 codecs.lookup(t.encoding)
2521
2522 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002523 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002524 b = self.BytesIO(b"abc\n\xff\n")
2525 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002526 self.assertRaises(UnicodeError, t.read)
2527 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002528 b = self.BytesIO(b"abc\n\xff\n")
2529 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002530 self.assertRaises(UnicodeError, t.read)
2531 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002532 b = self.BytesIO(b"abc\n\xff\n")
2533 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002534 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002535 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002536 b = self.BytesIO(b"abc\n\xff\n")
2537 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002538 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002539
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002540 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002541 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002542 b = self.BytesIO()
2543 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002544 self.assertRaises(UnicodeError, t.write, "\xff")
2545 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002546 b = self.BytesIO()
2547 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002548 self.assertRaises(UnicodeError, t.write, "\xff")
2549 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002550 b = self.BytesIO()
2551 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002552 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002553 t.write("abc\xffdef\n")
2554 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002555 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002556 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002557 b = self.BytesIO()
2558 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002559 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002560 t.write("abc\xffdef\n")
2561 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002562 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002563
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002564 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002565 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2566
2567 tests = [
2568 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002569 [ '', input_lines ],
2570 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2571 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2572 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002573 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002574 encodings = (
2575 'utf-8', 'latin-1',
2576 'utf-16', 'utf-16-le', 'utf-16-be',
2577 'utf-32', 'utf-32-le', 'utf-32-be',
2578 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002579
Guido van Rossum8358db22007-08-18 21:39:55 +00002580 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002581 # character in TextIOWrapper._pending_line.
2582 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002583 # XXX: str.encode() should return bytes
2584 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002585 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002586 for bufsize in range(1, 10):
2587 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002588 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2589 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002590 encoding=encoding)
2591 if do_reads:
2592 got_lines = []
2593 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002594 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002595 if c2 == '':
2596 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002597 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002598 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002599 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002600 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002601
2602 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002603 self.assertEqual(got_line, exp_line)
2604 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002605
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002606 def test_newlines_input(self):
2607 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002608 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2609 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002610 (None, normalized.decode("ascii").splitlines(keepends=True)),
2611 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002612 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2613 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2614 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002615 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002616 buf = self.BytesIO(testdata)
2617 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002618 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002619 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002620 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002621
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002622 def test_newlines_output(self):
2623 testdict = {
2624 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2625 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2626 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2627 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2628 }
2629 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2630 for newline, expected in tests:
2631 buf = self.BytesIO()
2632 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2633 txt.write("AAA\nB")
2634 txt.write("BB\nCCC\n")
2635 txt.write("X\rY\r\nZ")
2636 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002637 self.assertEqual(buf.closed, False)
2638 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002639
2640 def test_destructor(self):
2641 l = []
2642 base = self.BytesIO
2643 class MyBytesIO(base):
2644 def close(self):
2645 l.append(self.getvalue())
2646 base.close(self)
2647 b = MyBytesIO()
2648 t = self.TextIOWrapper(b, encoding="ascii")
2649 t.write("abc")
2650 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002651 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002652 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002653
2654 def test_override_destructor(self):
2655 record = []
2656 class MyTextIO(self.TextIOWrapper):
2657 def __del__(self):
2658 record.append(1)
2659 try:
2660 f = super().__del__
2661 except AttributeError:
2662 pass
2663 else:
2664 f()
2665 def close(self):
2666 record.append(2)
2667 super().close()
2668 def flush(self):
2669 record.append(3)
2670 super().flush()
2671 b = self.BytesIO()
2672 t = MyTextIO(b, encoding="ascii")
2673 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002674 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002675 self.assertEqual(record, [1, 2, 3])
2676
2677 def test_error_through_destructor(self):
2678 # Test that the exception state is not modified by a destructor,
2679 # even if close() fails.
2680 rawio = self.CloseFailureIO()
2681 def f():
2682 self.TextIOWrapper(rawio).xyzzy
2683 with support.captured_output("stderr") as s:
2684 self.assertRaises(AttributeError, f)
2685 s = s.getvalue().strip()
2686 if s:
2687 # The destructor *may* have printed an unraisable error, check it
2688 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002689 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002690 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002691
Guido van Rossum9b76da62007-04-11 01:09:03 +00002692 # Systematic tests of the text I/O API
2693
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002694 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002695 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002696 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002697 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002698 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002699 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002700 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002701 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002702 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002703 self.assertEqual(f.tell(), 0)
2704 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002705 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002706 self.assertEqual(f.seek(0), 0)
2707 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002708 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002709 self.assertEqual(f.read(2), "ab")
2710 self.assertEqual(f.read(1), "c")
2711 self.assertEqual(f.read(1), "")
2712 self.assertEqual(f.read(), "")
2713 self.assertEqual(f.tell(), cookie)
2714 self.assertEqual(f.seek(0), 0)
2715 self.assertEqual(f.seek(0, 2), cookie)
2716 self.assertEqual(f.write("def"), 3)
2717 self.assertEqual(f.seek(cookie), cookie)
2718 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002719 if enc.startswith("utf"):
2720 self.multi_line_test(f, enc)
2721 f.close()
2722
2723 def multi_line_test(self, f, enc):
2724 f.seek(0)
2725 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002726 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002727 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002728 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002729 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002730 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002731 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002732 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002733 wlines.append((f.tell(), line))
2734 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002735 f.seek(0)
2736 rlines = []
2737 while True:
2738 pos = f.tell()
2739 line = f.readline()
2740 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002741 break
2742 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002743 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002744
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002745 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002746 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002747 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002748 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002749 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002750 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002751 p2 = f.tell()
2752 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002753 self.assertEqual(f.tell(), p0)
2754 self.assertEqual(f.readline(), "\xff\n")
2755 self.assertEqual(f.tell(), p1)
2756 self.assertEqual(f.readline(), "\xff\n")
2757 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002758 f.seek(0)
2759 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002760 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002761 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002762 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002763 f.close()
2764
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002765 def test_seeking(self):
2766 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002767 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002768 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002769 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002770 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002771 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002772 suffix = bytes(u_suffix.encode("utf-8"))
2773 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002774 with self.open(support.TESTFN, "wb") as f:
2775 f.write(line*2)
2776 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2777 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002778 self.assertEqual(s, str(prefix, "ascii"))
2779 self.assertEqual(f.tell(), prefix_size)
2780 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002781
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002782 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002783 # Regression test for a specific bug
2784 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002785 with self.open(support.TESTFN, "wb") as f:
2786 f.write(data)
2787 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2788 f._CHUNK_SIZE # Just test that it exists
2789 f._CHUNK_SIZE = 2
2790 f.readline()
2791 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002792
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002793 def test_seek_and_tell(self):
2794 #Test seek/tell using the StatefulIncrementalDecoder.
2795 # Make test faster by doing smaller seeks
2796 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002797
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002798 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002799 """Tell/seek to various points within a data stream and ensure
2800 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002801 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002802 f.write(data)
2803 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002804 f = self.open(support.TESTFN, encoding='test_decoder')
2805 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002806 decoded = f.read()
2807 f.close()
2808
Neal Norwitze2b07052008-03-18 19:52:05 +00002809 for i in range(min_pos, len(decoded) + 1): # seek positions
2810 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002811 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002812 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002813 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002814 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002815 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002816 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002817 f.close()
2818
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002819 # Enable the test decoder.
2820 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002821
2822 # Run the tests.
2823 try:
2824 # Try each test case.
2825 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002826 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002827
2828 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002829 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2830 offset = CHUNK_SIZE - len(input)//2
2831 prefix = b'.'*offset
2832 # Don't bother seeking into the prefix (takes too long).
2833 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002834 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002835
2836 # Ensure our test decoder won't interfere with subsequent tests.
2837 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002838 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002839
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002840 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002841 data = "1234567890"
2842 tests = ("utf-16",
2843 "utf-16-le",
2844 "utf-16-be",
2845 "utf-32",
2846 "utf-32-le",
2847 "utf-32-be")
2848 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002849 buf = self.BytesIO()
2850 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002851 # Check if the BOM is written only once (see issue1753).
2852 f.write(data)
2853 f.write(data)
2854 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002855 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002856 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002857 self.assertEqual(f.read(), data * 2)
2858 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002859
Benjamin Petersona1b49012009-03-31 23:11:32 +00002860 def test_unreadable(self):
2861 class UnReadable(self.BytesIO):
2862 def readable(self):
2863 return False
2864 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002865 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002866
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002867 def test_read_one_by_one(self):
2868 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002869 reads = ""
2870 while True:
2871 c = txt.read(1)
2872 if not c:
2873 break
2874 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002875 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002876
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002877 def test_readlines(self):
2878 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2879 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2880 txt.seek(0)
2881 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2882 txt.seek(0)
2883 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2884
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002885 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002886 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002887 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002888 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002889 reads = ""
2890 while True:
2891 c = txt.read(128)
2892 if not c:
2893 break
2894 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002895 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002896
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002897 def test_writelines(self):
2898 l = ['ab', 'cd', 'ef']
2899 buf = self.BytesIO()
2900 txt = self.TextIOWrapper(buf)
2901 txt.writelines(l)
2902 txt.flush()
2903 self.assertEqual(buf.getvalue(), b'abcdef')
2904
2905 def test_writelines_userlist(self):
2906 l = UserList(['ab', 'cd', 'ef'])
2907 buf = self.BytesIO()
2908 txt = self.TextIOWrapper(buf)
2909 txt.writelines(l)
2910 txt.flush()
2911 self.assertEqual(buf.getvalue(), b'abcdef')
2912
2913 def test_writelines_error(self):
2914 txt = self.TextIOWrapper(self.BytesIO())
2915 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2916 self.assertRaises(TypeError, txt.writelines, None)
2917 self.assertRaises(TypeError, txt.writelines, b'abc')
2918
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002919 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002920 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002921
2922 # read one char at a time
2923 reads = ""
2924 while True:
2925 c = txt.read(1)
2926 if not c:
2927 break
2928 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002929 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002930
2931 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002932 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002933 txt._CHUNK_SIZE = 4
2934
2935 reads = ""
2936 while True:
2937 c = txt.read(4)
2938 if not c:
2939 break
2940 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002941 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002942
2943 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002944 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002945 txt._CHUNK_SIZE = 4
2946
2947 reads = txt.read(4)
2948 reads += txt.read(4)
2949 reads += txt.readline()
2950 reads += txt.readline()
2951 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002952 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002953
2954 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002955 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002956 txt._CHUNK_SIZE = 4
2957
2958 reads = txt.read(4)
2959 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002960 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002961
2962 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002963 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002964 txt._CHUNK_SIZE = 4
2965
2966 reads = txt.read(4)
2967 pos = txt.tell()
2968 txt.seek(0)
2969 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002970 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002971
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002972 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002973 buffer = self.BytesIO(self.testdata)
2974 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002975
2976 self.assertEqual(buffer.seekable(), txt.seekable())
2977
Antoine Pitroue4501852009-05-14 18:55:55 +00002978 def test_append_bom(self):
2979 # The BOM is not written again when appending to a non-empty file
2980 filename = support.TESTFN
2981 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2982 with self.open(filename, 'w', encoding=charset) as f:
2983 f.write('aaa')
2984 pos = f.tell()
2985 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002986 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002987
2988 with self.open(filename, 'a', encoding=charset) as f:
2989 f.write('xxx')
2990 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002991 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002992
2993 def test_seek_bom(self):
2994 # Same test, but when seeking manually
2995 filename = support.TESTFN
2996 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2997 with self.open(filename, 'w', encoding=charset) as f:
2998 f.write('aaa')
2999 pos = f.tell()
3000 with self.open(filename, 'r+', encoding=charset) as f:
3001 f.seek(pos)
3002 f.write('zzz')
3003 f.seek(0)
3004 f.write('bbb')
3005 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003006 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003007
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003008 def test_seek_append_bom(self):
3009 # Same test, but first seek to the start and then to the end
3010 filename = support.TESTFN
3011 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3012 with self.open(filename, 'w', encoding=charset) as f:
3013 f.write('aaa')
3014 with self.open(filename, 'a', encoding=charset) as f:
3015 f.seek(0)
3016 f.seek(0, self.SEEK_END)
3017 f.write('xxx')
3018 with self.open(filename, 'rb') as f:
3019 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3020
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003021 def test_errors_property(self):
3022 with self.open(support.TESTFN, "w") as f:
3023 self.assertEqual(f.errors, "strict")
3024 with self.open(support.TESTFN, "w", errors="replace") as f:
3025 self.assertEqual(f.errors, "replace")
3026
Brett Cannon31f59292011-02-21 19:29:56 +00003027 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00003028 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003029 def test_threads_write(self):
3030 # Issue6750: concurrent writes could duplicate data
3031 event = threading.Event()
3032 with self.open(support.TESTFN, "w", buffering=1) as f:
3033 def run(n):
3034 text = "Thread%03d\n" % n
3035 event.wait()
3036 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003037 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003038 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003039 with support.start_threads(threads, event.set):
3040 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003041 with self.open(support.TESTFN) as f:
3042 content = f.read()
3043 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003044 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003045
Antoine Pitrou6be88762010-05-03 16:48:20 +00003046 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003047 # Test that text file is closed despite failed flush
3048 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003049 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003050 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003051 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003052 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003053 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003054 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003055 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003056 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003057 self.assertTrue(txt.buffer.closed)
3058 self.assertTrue(closed) # flush() called
3059 self.assertFalse(closed[0]) # flush() called before file closed
3060 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003061 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003062
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003063 def test_close_error_on_close(self):
3064 buffer = self.BytesIO(self.testdata)
3065 def bad_flush():
3066 raise OSError('flush')
3067 def bad_close():
3068 raise OSError('close')
3069 buffer.close = bad_close
3070 txt = self.TextIOWrapper(buffer, encoding="ascii")
3071 txt.flush = bad_flush
3072 with self.assertRaises(OSError) as err: # exception not swallowed
3073 txt.close()
3074 self.assertEqual(err.exception.args, ('close',))
3075 self.assertIsInstance(err.exception.__context__, OSError)
3076 self.assertEqual(err.exception.__context__.args, ('flush',))
3077 self.assertFalse(txt.closed)
3078
3079 def test_nonnormalized_close_error_on_close(self):
3080 # Issue #21677
3081 buffer = self.BytesIO(self.testdata)
3082 def bad_flush():
3083 raise non_existing_flush
3084 def bad_close():
3085 raise non_existing_close
3086 buffer.close = bad_close
3087 txt = self.TextIOWrapper(buffer, encoding="ascii")
3088 txt.flush = bad_flush
3089 with self.assertRaises(NameError) as err: # exception not swallowed
3090 txt.close()
3091 self.assertIn('non_existing_close', str(err.exception))
3092 self.assertIsInstance(err.exception.__context__, NameError)
3093 self.assertIn('non_existing_flush', str(err.exception.__context__))
3094 self.assertFalse(txt.closed)
3095
Antoine Pitrou6be88762010-05-03 16:48:20 +00003096 def test_multi_close(self):
3097 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3098 txt.close()
3099 txt.close()
3100 txt.close()
3101 self.assertRaises(ValueError, txt.flush)
3102
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003103 def test_unseekable(self):
3104 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3105 self.assertRaises(self.UnsupportedOperation, txt.tell)
3106 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3107
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003108 def test_readonly_attributes(self):
3109 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3110 buf = self.BytesIO(self.testdata)
3111 with self.assertRaises(AttributeError):
3112 txt.buffer = buf
3113
Antoine Pitroue96ec682011-07-23 21:46:35 +02003114 def test_rawio(self):
3115 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3116 # that subprocess.Popen() can have the required unbuffered
3117 # semantics with universal_newlines=True.
3118 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3119 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3120 # Reads
3121 self.assertEqual(txt.read(4), 'abcd')
3122 self.assertEqual(txt.readline(), 'efghi\n')
3123 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3124
3125 def test_rawio_write_through(self):
3126 # Issue #12591: with write_through=True, writes don't need a flush
3127 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3128 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3129 write_through=True)
3130 txt.write('1')
3131 txt.write('23\n4')
3132 txt.write('5')
3133 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3134
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003135 def test_bufio_write_through(self):
3136 # Issue #21396: write_through=True doesn't force a flush()
3137 # on the underlying binary buffered object.
3138 flush_called, write_called = [], []
3139 class BufferedWriter(self.BufferedWriter):
3140 def flush(self, *args, **kwargs):
3141 flush_called.append(True)
3142 return super().flush(*args, **kwargs)
3143 def write(self, *args, **kwargs):
3144 write_called.append(True)
3145 return super().write(*args, **kwargs)
3146
3147 rawio = self.BytesIO()
3148 data = b"a"
3149 bufio = BufferedWriter(rawio, len(data)*2)
3150 textio = self.TextIOWrapper(bufio, encoding='ascii',
3151 write_through=True)
3152 # write to the buffered io but don't overflow the buffer
3153 text = data.decode('ascii')
3154 textio.write(text)
3155
3156 # buffer.flush is not called with write_through=True
3157 self.assertFalse(flush_called)
3158 # buffer.write *is* called with write_through=True
3159 self.assertTrue(write_called)
3160 self.assertEqual(rawio.getvalue(), b"") # no flush
3161
3162 write_called = [] # reset
3163 textio.write(text * 10) # total content is larger than bufio buffer
3164 self.assertTrue(write_called)
3165 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3166
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003167 def test_read_nonbytes(self):
3168 # Issue #17106
3169 # Crash when underlying read() returns non-bytes
3170 t = self.TextIOWrapper(self.StringIO('a'))
3171 self.assertRaises(TypeError, t.read, 1)
3172 t = self.TextIOWrapper(self.StringIO('a'))
3173 self.assertRaises(TypeError, t.readline)
3174 t = self.TextIOWrapper(self.StringIO('a'))
3175 self.assertRaises(TypeError, t.read)
3176
3177 def test_illegal_decoder(self):
3178 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003179 # Bypass the early encoding check added in issue 20404
3180 def _make_illegal_wrapper():
3181 quopri = codecs.lookup("quopri")
3182 quopri._is_text_encoding = True
3183 try:
3184 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3185 newline='\n', encoding="quopri")
3186 finally:
3187 quopri._is_text_encoding = False
3188 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003189 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003190 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003191 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003192 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003193 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003194 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003195 self.assertRaises(TypeError, t.read)
3196
Antoine Pitrou712cb732013-12-21 15:51:54 +01003197 def _check_create_at_shutdown(self, **kwargs):
3198 # Issue #20037: creating a TextIOWrapper at shutdown
3199 # shouldn't crash the interpreter.
3200 iomod = self.io.__name__
3201 code = """if 1:
3202 import codecs
3203 import {iomod} as io
3204
3205 # Avoid looking up codecs at shutdown
3206 codecs.lookup('utf-8')
3207
3208 class C:
3209 def __init__(self):
3210 self.buf = io.BytesIO()
3211 def __del__(self):
3212 io.TextIOWrapper(self.buf, **{kwargs})
3213 print("ok")
3214 c = C()
3215 """.format(iomod=iomod, kwargs=kwargs)
3216 return assert_python_ok("-c", code)
3217
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003218 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003219 def test_create_at_shutdown_without_encoding(self):
3220 rc, out, err = self._check_create_at_shutdown()
3221 if err:
3222 # Can error out with a RuntimeError if the module state
3223 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003224 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003225 else:
3226 self.assertEqual("ok", out.decode().strip())
3227
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003228 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003229 def test_create_at_shutdown_with_encoding(self):
3230 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3231 errors='strict')
3232 self.assertFalse(err)
3233 self.assertEqual("ok", out.decode().strip())
3234
Antoine Pitroub8503892014-04-29 10:14:02 +02003235 def test_read_byteslike(self):
3236 r = MemviewBytesIO(b'Just some random string\n')
3237 t = self.TextIOWrapper(r, 'utf-8')
3238
3239 # TextIOwrapper will not read the full string, because
3240 # we truncate it to a multiple of the native int size
3241 # so that we can construct a more complex memoryview.
3242 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3243
3244 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3245
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003246 def test_issue22849(self):
3247 class F(object):
3248 def readable(self): return True
3249 def writable(self): return True
3250 def seekable(self): return True
3251
3252 for i in range(10):
3253 try:
3254 self.TextIOWrapper(F(), encoding='utf-8')
3255 except Exception:
3256 pass
3257
3258 F.tell = lambda x: 0
3259 t = self.TextIOWrapper(F(), encoding='utf-8')
3260
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003261
Antoine Pitroub8503892014-04-29 10:14:02 +02003262class MemviewBytesIO(io.BytesIO):
3263 '''A BytesIO object whose read method returns memoryviews
3264 rather than bytes'''
3265
3266 def read1(self, len_):
3267 return _to_memoryview(super().read1(len_))
3268
3269 def read(self, len_):
3270 return _to_memoryview(super().read(len_))
3271
3272def _to_memoryview(buf):
3273 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3274
3275 arr = array.array('i')
3276 idx = len(buf) - len(buf) % arr.itemsize
3277 arr.frombytes(buf[:idx])
3278 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003279
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003280
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003281class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003282 io = io
Steve Dowerfe8f4c92016-09-07 09:31:52 -07003283 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003284
3285 def test_initialization(self):
3286 r = self.BytesIO(b"\xc3\xa9\n\n")
3287 b = self.BufferedReader(r, 1000)
3288 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003289 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3290 self.assertRaises(ValueError, t.read)
3291
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003292 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3293 self.assertRaises(Exception, repr, t)
3294
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003295 def test_garbage_collection(self):
3296 # C TextIOWrapper objects are collected, and collecting them flushes
3297 # all data to disk.
3298 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003299 with support.check_warnings(('', ResourceWarning)):
3300 rawio = io.FileIO(support.TESTFN, "wb")
3301 b = self.BufferedWriter(rawio)
3302 t = self.TextIOWrapper(b, encoding="ascii")
3303 t.write("456def")
3304 t.x = t
3305 wr = weakref.ref(t)
3306 del t
3307 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003308 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003309 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003310 self.assertEqual(f.read(), b"456def")
3311
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003312 def test_rwpair_cleared_before_textio(self):
3313 # Issue 13070: TextIOWrapper's finalization would crash when called
3314 # after the reference to the underlying BufferedRWPair's writer got
3315 # cleared by the GC.
3316 for i in range(1000):
3317 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3318 t1 = self.TextIOWrapper(b1, encoding="ascii")
3319 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3320 t2 = self.TextIOWrapper(b2, encoding="ascii")
3321 # circular references
3322 t1.buddy = t2
3323 t2.buddy = t1
3324 support.gc_collect()
3325
3326
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003327class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003328 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003329 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003330
3331
3332class IncrementalNewlineDecoderTest(unittest.TestCase):
3333
3334 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003335 # UTF-8 specific tests for a newline decoder
3336 def _check_decode(b, s, **kwargs):
3337 # We exercise getstate() / setstate() as well as decode()
3338 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003339 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003340 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003341 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003342
Antoine Pitrou180a3362008-12-14 16:36:46 +00003343 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003344
Antoine Pitrou180a3362008-12-14 16:36:46 +00003345 _check_decode(b'\xe8', "")
3346 _check_decode(b'\xa2', "")
3347 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003348
Antoine Pitrou180a3362008-12-14 16:36:46 +00003349 _check_decode(b'\xe8', "")
3350 _check_decode(b'\xa2', "")
3351 _check_decode(b'\x88', "\u8888")
3352
3353 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003354 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3355
Antoine Pitrou180a3362008-12-14 16:36:46 +00003356 decoder.reset()
3357 _check_decode(b'\n', "\n")
3358 _check_decode(b'\r', "")
3359 _check_decode(b'', "\n", final=True)
3360 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003361
Antoine Pitrou180a3362008-12-14 16:36:46 +00003362 _check_decode(b'\r', "")
3363 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003364
Antoine Pitrou180a3362008-12-14 16:36:46 +00003365 _check_decode(b'\r\r\n', "\n\n")
3366 _check_decode(b'\r', "")
3367 _check_decode(b'\r', "\n")
3368 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003369
Antoine Pitrou180a3362008-12-14 16:36:46 +00003370 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3371 _check_decode(b'\xe8\xa2\x88', "\u8888")
3372 _check_decode(b'\n', "\n")
3373 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3374 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003375
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003376 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003377 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003378 if encoding is not None:
3379 encoder = codecs.getincrementalencoder(encoding)()
3380 def _decode_bytewise(s):
3381 # Decode one byte at a time
3382 for b in encoder.encode(s):
3383 result.append(decoder.decode(bytes([b])))
3384 else:
3385 encoder = None
3386 def _decode_bytewise(s):
3387 # Decode one char at a time
3388 for c in s:
3389 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003390 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003391 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003392 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003393 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003394 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003395 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003396 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003397 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003398 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003399 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003400 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003401 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003402 input = "abc"
3403 if encoder is not None:
3404 encoder.reset()
3405 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003406 self.assertEqual(decoder.decode(input), "abc")
3407 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003408
3409 def test_newline_decoder(self):
3410 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003411 # None meaning the IncrementalNewlineDecoder takes unicode input
3412 # rather than bytes input
3413 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003414 'utf-16', 'utf-16-le', 'utf-16-be',
3415 'utf-32', 'utf-32-le', 'utf-32-be',
3416 )
3417 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003418 decoder = enc and codecs.getincrementaldecoder(enc)()
3419 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3420 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003421 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003422 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3423 self.check_newline_decoding_utf8(decoder)
3424
Antoine Pitrou66913e22009-03-06 23:40:56 +00003425 def test_newline_bytes(self):
3426 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3427 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003428 self.assertEqual(dec.newlines, None)
3429 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3430 self.assertEqual(dec.newlines, None)
3431 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3432 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003433 dec = self.IncrementalNewlineDecoder(None, translate=False)
3434 _check(dec)
3435 dec = self.IncrementalNewlineDecoder(None, translate=True)
3436 _check(dec)
3437
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003438class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3439 pass
3440
3441class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3442 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003443
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003444
Guido van Rossum01a27522007-03-07 01:00:12 +00003445# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003446
Guido van Rossum5abbf752007-08-27 17:39:33 +00003447class MiscIOTest(unittest.TestCase):
3448
Barry Warsaw40e82462008-11-20 20:14:50 +00003449 def tearDown(self):
3450 support.unlink(support.TESTFN)
3451
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003452 def test___all__(self):
3453 for name in self.io.__all__:
3454 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003455 self.assertIsNotNone(obj, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003456 if name == "open":
3457 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003458 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003459 self.assertTrue(issubclass(obj, Exception), name)
3460 elif not name.startswith("SEEK_"):
3461 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003462
Barry Warsaw40e82462008-11-20 20:14:50 +00003463 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003464 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003465 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003466 f.close()
3467
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003468 with support.check_warnings(('', DeprecationWarning)):
3469 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003470 self.assertEqual(f.name, support.TESTFN)
3471 self.assertEqual(f.buffer.name, support.TESTFN)
3472 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3473 self.assertEqual(f.mode, "U")
3474 self.assertEqual(f.buffer.mode, "rb")
3475 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003476 f.close()
3477
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003478 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003479 self.assertEqual(f.mode, "w+")
3480 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3481 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003482
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003483 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003484 self.assertEqual(g.mode, "wb")
3485 self.assertEqual(g.raw.mode, "wb")
3486 self.assertEqual(g.name, f.fileno())
3487 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003488 f.close()
3489 g.close()
3490
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003491 def test_io_after_close(self):
3492 for kwargs in [
3493 {"mode": "w"},
3494 {"mode": "wb"},
3495 {"mode": "w", "buffering": 1},
3496 {"mode": "w", "buffering": 2},
3497 {"mode": "wb", "buffering": 0},
3498 {"mode": "r"},
3499 {"mode": "rb"},
3500 {"mode": "r", "buffering": 1},
3501 {"mode": "r", "buffering": 2},
3502 {"mode": "rb", "buffering": 0},
3503 {"mode": "w+"},
3504 {"mode": "w+b"},
3505 {"mode": "w+", "buffering": 1},
3506 {"mode": "w+", "buffering": 2},
3507 {"mode": "w+b", "buffering": 0},
3508 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003509 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003510 f.close()
3511 self.assertRaises(ValueError, f.flush)
3512 self.assertRaises(ValueError, f.fileno)
3513 self.assertRaises(ValueError, f.isatty)
3514 self.assertRaises(ValueError, f.__iter__)
3515 if hasattr(f, "peek"):
3516 self.assertRaises(ValueError, f.peek, 1)
3517 self.assertRaises(ValueError, f.read)
3518 if hasattr(f, "read1"):
3519 self.assertRaises(ValueError, f.read1, 1024)
Martin Panterccb2c0e2016-10-20 23:48:14 +00003520 self.assertRaises(ValueError, f.read1)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003521 if hasattr(f, "readall"):
3522 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003523 if hasattr(f, "readinto"):
3524 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003525 if hasattr(f, "readinto1"):
3526 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003527 self.assertRaises(ValueError, f.readline)
3528 self.assertRaises(ValueError, f.readlines)
Xiang Zhang026435c2017-04-15 12:47:28 +08003529 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003530 self.assertRaises(ValueError, f.seek, 0)
3531 self.assertRaises(ValueError, f.tell)
3532 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003533 self.assertRaises(ValueError, f.write,
3534 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003535 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003536 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003537
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003538 def test_blockingioerror(self):
3539 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003540 class C(str):
3541 pass
3542 c = C("")
3543 b = self.BlockingIOError(1, c)
3544 c.b = b
3545 b.c = c
3546 wr = weakref.ref(c)
3547 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003548 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003549 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003550
3551 def test_abcs(self):
3552 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003553 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3554 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3555 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3556 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003557
3558 def _check_abc_inheritance(self, abcmodule):
3559 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003560 self.assertIsInstance(f, abcmodule.IOBase)
3561 self.assertIsInstance(f, abcmodule.RawIOBase)
3562 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3563 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003564 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003565 self.assertIsInstance(f, abcmodule.IOBase)
3566 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3567 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3568 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003569 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003570 self.assertIsInstance(f, abcmodule.IOBase)
3571 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3572 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3573 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003574
3575 def test_abc_inheritance(self):
3576 # Test implementations inherit from their respective ABCs
3577 self._check_abc_inheritance(self)
3578
3579 def test_abc_inheritance_official(self):
3580 # Test implementations inherit from the official ABCs of the
3581 # baseline "io" module.
3582 self._check_abc_inheritance(io)
3583
Antoine Pitroue033e062010-10-29 10:38:18 +00003584 def _check_warn_on_dealloc(self, *args, **kwargs):
3585 f = open(*args, **kwargs)
3586 r = repr(f)
3587 with self.assertWarns(ResourceWarning) as cm:
3588 f = None
3589 support.gc_collect()
3590 self.assertIn(r, str(cm.warning.args[0]))
3591
3592 def test_warn_on_dealloc(self):
3593 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3594 self._check_warn_on_dealloc(support.TESTFN, "wb")
3595 self._check_warn_on_dealloc(support.TESTFN, "w")
3596
3597 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3598 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003599 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003600 for fd in fds:
3601 try:
3602 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003603 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003604 if e.errno != errno.EBADF:
3605 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003606 self.addCleanup(cleanup_fds)
3607 r, w = os.pipe()
3608 fds += r, w
3609 self._check_warn_on_dealloc(r, *args, **kwargs)
3610 # When using closefd=False, there's no warning
3611 r, w = os.pipe()
3612 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02003613 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00003614 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00003615
3616 def test_warn_on_dealloc_fd(self):
3617 self._check_warn_on_dealloc_fd("rb", buffering=0)
3618 self._check_warn_on_dealloc_fd("rb")
3619 self._check_warn_on_dealloc_fd("r")
3620
3621
Antoine Pitrou243757e2010-11-05 21:15:39 +00003622 def test_pickling(self):
3623 # Pickling file objects is forbidden
3624 for kwargs in [
3625 {"mode": "w"},
3626 {"mode": "wb"},
3627 {"mode": "wb", "buffering": 0},
3628 {"mode": "r"},
3629 {"mode": "rb"},
3630 {"mode": "rb", "buffering": 0},
3631 {"mode": "w+"},
3632 {"mode": "w+b"},
3633 {"mode": "w+b", "buffering": 0},
3634 ]:
3635 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3636 with self.open(support.TESTFN, **kwargs) as f:
3637 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3638
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003639 def test_nonblock_pipe_write_bigbuf(self):
3640 self._test_nonblock_pipe_write(16*1024)
3641
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003642 def test_nonblock_pipe_write_smallbuf(self):
3643 self._test_nonblock_pipe_write(1024)
3644
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003645 @unittest.skipUnless(hasattr(os, 'set_blocking'),
3646 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003647 def _test_nonblock_pipe_write(self, bufsize):
3648 sent = []
3649 received = []
3650 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003651 os.set_blocking(r, False)
3652 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003653
3654 # To exercise all code paths in the C implementation we need
3655 # to play with buffer sizes. For instance, if we choose a
3656 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3657 # then we will never get a partial write of the buffer.
3658 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3659 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3660
3661 with rf, wf:
3662 for N in 9999, 73, 7574:
3663 try:
3664 i = 0
3665 while True:
3666 msg = bytes([i % 26 + 97]) * N
3667 sent.append(msg)
3668 wf.write(msg)
3669 i += 1
3670
3671 except self.BlockingIOError as e:
3672 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003673 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003674 sent[-1] = sent[-1][:e.characters_written]
3675 received.append(rf.read())
3676 msg = b'BLOCKED'
3677 wf.write(msg)
3678 sent.append(msg)
3679
3680 while True:
3681 try:
3682 wf.flush()
3683 break
3684 except self.BlockingIOError as e:
3685 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003686 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003687 self.assertEqual(e.characters_written, 0)
3688 received.append(rf.read())
3689
3690 received += iter(rf.read, None)
3691
3692 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003693 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003694 self.assertTrue(wf.closed)
3695 self.assertTrue(rf.closed)
3696
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003697 def test_create_fail(self):
3698 # 'x' mode fails if file is existing
3699 with self.open(support.TESTFN, 'w'):
3700 pass
3701 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3702
3703 def test_create_writes(self):
3704 # 'x' mode opens for writing
3705 with self.open(support.TESTFN, 'xb') as f:
3706 f.write(b"spam")
3707 with self.open(support.TESTFN, 'rb') as f:
3708 self.assertEqual(b"spam", f.read())
3709
Christian Heimes7b648752012-09-10 14:48:43 +02003710 def test_open_allargs(self):
3711 # there used to be a buffer overflow in the parser for rawmode
3712 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3713
3714
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003715class CMiscIOTest(MiscIOTest):
3716 io = io
3717
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003718 def test_readinto_buffer_overflow(self):
3719 # Issue #18025
3720 class BadReader(self.io.BufferedIOBase):
3721 def read(self, n=-1):
3722 return b'x' * 10**6
3723 bufio = BadReader()
3724 b = bytearray(2)
3725 self.assertRaises(ValueError, bufio.readinto, b)
3726
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003727 @unittest.skipUnless(threading, 'Threading required for this test.')
3728 def check_daemon_threads_shutdown_deadlock(self, stream_name):
3729 # Issue #23309: deadlocks at shutdown should be avoided when a
3730 # daemon thread and the main thread both write to a file.
3731 code = """if 1:
3732 import sys
3733 import time
3734 import threading
Victor Stinner2a1aed02017-04-21 17:59:23 +02003735 from test.support import SuppressCrashReport
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003736
3737 file = sys.{stream_name}
3738
3739 def run():
3740 while True:
3741 file.write('.')
3742 file.flush()
3743
Victor Stinner2a1aed02017-04-21 17:59:23 +02003744 crash = SuppressCrashReport()
3745 crash.__enter__()
3746 # don't call __exit__(): the crash occurs at Python shutdown
3747
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003748 thread = threading.Thread(target=run)
3749 thread.daemon = True
3750 thread.start()
3751
3752 time.sleep(0.5)
3753 file.write('!')
3754 file.flush()
3755 """.format_map(locals())
3756 res, _ = run_python_until_end("-c", code)
3757 err = res.err.decode()
3758 if res.rc != 0:
3759 # Failure: should be a fatal error
3760 self.assertIn("Fatal Python error: could not acquire lock "
3761 "for <_io.BufferedWriter name='<{stream_name}>'> "
3762 "at interpreter shutdown, possibly due to "
3763 "daemon threads".format_map(locals()),
3764 err)
3765 else:
3766 self.assertFalse(err.strip('.!'))
3767
3768 def test_daemon_threads_shutdown_stdout_deadlock(self):
3769 self.check_daemon_threads_shutdown_deadlock('stdout')
3770
3771 def test_daemon_threads_shutdown_stderr_deadlock(self):
3772 self.check_daemon_threads_shutdown_deadlock('stderr')
3773
3774
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003775class PyMiscIOTest(MiscIOTest):
3776 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003777
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003778
3779@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3780class SignalsTest(unittest.TestCase):
3781
3782 def setUp(self):
3783 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3784
3785 def tearDown(self):
3786 signal.signal(signal.SIGALRM, self.oldalrm)
3787
3788 def alarm_interrupt(self, sig, frame):
3789 1/0
3790
3791 @unittest.skipUnless(threading, 'Threading required for this test.')
3792 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3793 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003794 invokes the signal handler, and bubbles up the exception raised
3795 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003796 read_results = []
3797 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003798 if hasattr(signal, 'pthread_sigmask'):
3799 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003800 s = os.read(r, 1)
3801 read_results.append(s)
3802 t = threading.Thread(target=_read)
3803 t.daemon = True
3804 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003805 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01003806 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003807 try:
3808 wio = self.io.open(w, **fdopen_kwargs)
3809 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003810 # Fill the pipe enough that the write will be blocking.
3811 # It will be interrupted by the timer armed above. Since the
3812 # other thread has read one byte, the low-level write will
3813 # return with a successful (partial) result rather than an EINTR.
3814 # The buffered IO layer must check for pending signal
3815 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003816 signal.alarm(1)
3817 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01003818 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02003819 finally:
3820 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003821 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003822 # We got one byte, get another one and check that it isn't a
3823 # repeat of the first one.
3824 read_results.append(os.read(r, 1))
3825 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3826 finally:
3827 os.close(w)
3828 os.close(r)
3829 # This is deliberate. If we didn't close the file descriptor
3830 # before closing wio, wio would try to flush its internal
3831 # buffer, and block again.
3832 try:
3833 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003834 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003835 if e.errno != errno.EBADF:
3836 raise
3837
3838 def test_interrupted_write_unbuffered(self):
3839 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3840
3841 def test_interrupted_write_buffered(self):
3842 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3843
Victor Stinner6ab72862014-09-03 23:32:28 +02003844 # Issue #22331: The test hangs on FreeBSD 7.2
3845 @support.requires_freebsd_version(8)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003846 def test_interrupted_write_text(self):
3847 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3848
Brett Cannon31f59292011-02-21 19:29:56 +00003849 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003850 def check_reentrant_write(self, data, **fdopen_kwargs):
3851 def on_alarm(*args):
3852 # Will be called reentrantly from the same thread
3853 wio.write(data)
3854 1/0
3855 signal.signal(signal.SIGALRM, on_alarm)
3856 r, w = os.pipe()
3857 wio = self.io.open(w, **fdopen_kwargs)
3858 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003859 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003860 # Either the reentrant call to wio.write() fails with RuntimeError,
3861 # or the signal handler raises ZeroDivisionError.
3862 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3863 while 1:
3864 for i in range(100):
3865 wio.write(data)
3866 wio.flush()
3867 # Make sure the buffer doesn't fill up and block further writes
3868 os.read(r, len(data) * 100)
3869 exc = cm.exception
3870 if isinstance(exc, RuntimeError):
3871 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3872 finally:
3873 wio.close()
3874 os.close(r)
3875
3876 def test_reentrant_write_buffered(self):
3877 self.check_reentrant_write(b"xy", mode="wb")
3878
3879 def test_reentrant_write_text(self):
3880 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3881
Antoine Pitrou707ce822011-02-25 21:24:11 +00003882 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3883 """Check that a buffered read, when it gets interrupted (either
3884 returning a partial result or EINTR), properly invokes the signal
3885 handler and retries if the latter returned successfully."""
3886 r, w = os.pipe()
3887 fdopen_kwargs["closefd"] = False
3888 def alarm_handler(sig, frame):
3889 os.write(w, b"bar")
3890 signal.signal(signal.SIGALRM, alarm_handler)
3891 try:
3892 rio = self.io.open(r, **fdopen_kwargs)
3893 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003894 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003895 # Expected behaviour:
3896 # - first raw read() returns partial b"foo"
3897 # - second raw read() returns EINTR
3898 # - third raw read() returns b"bar"
3899 self.assertEqual(decode(rio.read(6)), "foobar")
3900 finally:
3901 rio.close()
3902 os.close(w)
3903 os.close(r)
3904
Antoine Pitrou20db5112011-08-19 20:32:34 +02003905 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003906 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3907 mode="rb")
3908
Antoine Pitrou20db5112011-08-19 20:32:34 +02003909 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003910 self.check_interrupted_read_retry(lambda x: x,
3911 mode="r")
3912
3913 @unittest.skipUnless(threading, 'Threading required for this test.')
3914 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3915 """Check that a buffered write, when it gets interrupted (either
3916 returning a partial result or EINTR), properly invokes the signal
3917 handler and retries if the latter returned successfully."""
3918 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003919
Antoine Pitrou707ce822011-02-25 21:24:11 +00003920 # A quantity that exceeds the buffer size of an anonymous pipe's
3921 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003922 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003923 r, w = os.pipe()
3924 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003925
Antoine Pitrou707ce822011-02-25 21:24:11 +00003926 # We need a separate thread to read from the pipe and allow the
3927 # write() to finish. This thread is started after the SIGALRM is
3928 # received (forcing a first EINTR in write()).
3929 read_results = []
3930 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003931 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00003932 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003933 try:
3934 while not write_finished:
3935 while r in select.select([r], [], [], 1.0)[0]:
3936 s = os.read(r, 1024)
3937 read_results.append(s)
3938 except BaseException as exc:
3939 nonlocal error
3940 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00003941 t = threading.Thread(target=_read)
3942 t.daemon = True
3943 def alarm1(sig, frame):
3944 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003945 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003946 def alarm2(sig, frame):
3947 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003948
3949 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00003950 signal.signal(signal.SIGALRM, alarm1)
3951 try:
3952 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003953 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003954 # Expected behaviour:
3955 # - first raw write() is partial (because of the limited pipe buffer
3956 # and the first alarm)
3957 # - second raw write() returns EINTR (because of the second alarm)
3958 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003959 written = wio.write(large_data)
3960 self.assertEqual(N, written)
3961
Antoine Pitrou707ce822011-02-25 21:24:11 +00003962 wio.flush()
3963 write_finished = True
3964 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003965
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003966 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003967 self.assertEqual(N, sum(len(x) for x in read_results))
3968 finally:
3969 write_finished = True
3970 os.close(w)
3971 os.close(r)
3972 # This is deliberate. If we didn't close the file descriptor
3973 # before closing wio, wio would try to flush its internal
3974 # buffer, and could block (in case of failure).
3975 try:
3976 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003977 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003978 if e.errno != errno.EBADF:
3979 raise
3980
Antoine Pitrou20db5112011-08-19 20:32:34 +02003981 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003982 self.check_interrupted_write_retry(b"x", mode="wb")
3983
Antoine Pitrou20db5112011-08-19 20:32:34 +02003984 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003985 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3986
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003987
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003988class CSignalsTest(SignalsTest):
3989 io = io
3990
3991class PySignalsTest(SignalsTest):
3992 io = pyio
3993
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003994 # Handling reentrancy issues would slow down _pyio even more, so the
3995 # tests are disabled.
3996 test_reentrant_write_buffered = None
3997 test_reentrant_write_text = None
3998
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003999
Ezio Melottidaa42c72013-03-23 16:30:16 +02004000def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07004001 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004002 CBufferedReaderTest, PyBufferedReaderTest,
4003 CBufferedWriterTest, PyBufferedWriterTest,
4004 CBufferedRWPairTest, PyBufferedRWPairTest,
4005 CBufferedRandomTest, PyBufferedRandomTest,
4006 StatefulIncrementalDecoderTest,
4007 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4008 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004009 CMiscIOTest, PyMiscIOTest,
4010 CSignalsTest, PySignalsTest,
4011 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004012
4013 # Put the namespaces of the IO module we are testing and some useful mock
4014 # classes in the __dict__ of each test.
4015 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00004016 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004017 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4018 c_io_ns = {name : getattr(io, name) for name in all_members}
4019 py_io_ns = {name : getattr(pyio, name) for name in all_members}
4020 globs = globals()
4021 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4022 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4023 # Avoid turning open into a bound method.
4024 py_io_ns["open"] = pyio.OpenWrapper
4025 for test in tests:
4026 if test.__name__.startswith("C"):
4027 for name, obj in c_io_ns.items():
4028 setattr(test, name, obj)
4029 elif test.__name__.startswith("Py"):
4030 for name, obj in py_io_ns.items():
4031 setattr(test, name, obj)
4032
Ezio Melottidaa42c72013-03-23 16:30:16 +02004033 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4034 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004035
4036if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004037 unittest.main()