blob: fc68b09d8fbd849ce6c4a6864355fc408432c429 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Berker Peksagce643912015-05-06 06:33:17 +030038from test.support.script_helper import assert_python_ok, run_python_until_end
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000047
Martin Panter6bb91f32016-05-28 00:41:57 +000048try:
49 import ctypes
50except ImportError:
51 def byteslike(*pos, **kw):
52 return array.array("b", bytes(*pos, **kw))
53else:
54 def byteslike(*pos, **kw):
55 """Create a bytes-like object having no string or sequence methods"""
56 data = bytes(*pos, **kw)
57 obj = EmptyStruct()
58 ctypes.resize(obj, len(data))
59 memoryview(obj).cast("B")[:] = data
60 return obj
61 class EmptyStruct(ctypes.Structure):
62 pass
63
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064def _default_chunk_size():
65 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000066 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000067 return f._CHUNK_SIZE
68
69
Antoine Pitrou328ec742010-09-14 18:37:24 +000070class MockRawIOWithoutRead:
71 """A RawIO implementation without read(), so as to exercise the default
72 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000073
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000074 def __init__(self, read_stack=()):
75 self._read_stack = list(read_stack)
76 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000077 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000078 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000079
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000082 return len(b)
83
84 def writable(self):
85 return True
86
Guido van Rossum68bbcd22007-02-27 17:19:33 +000087 def fileno(self):
88 return 42
89
90 def readable(self):
91 return True
92
Guido van Rossum01a27522007-03-07 01:00:12 +000093 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000094 return True
95
Guido van Rossum01a27522007-03-07 01:00:12 +000096 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000098
99 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000100 return 0 # same comment as above
101
102 def readinto(self, buf):
103 self._reads += 1
104 max_len = len(buf)
105 try:
106 data = self._read_stack[0]
107 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000108 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000109 return 0
110 if data is None:
111 del self._read_stack[0]
112 return None
113 n = len(data)
114 if len(data) <= max_len:
115 del self._read_stack[0]
116 buf[:n] = data
117 return n
118 else:
119 buf[:] = data[:max_len]
120 self._read_stack[0] = data[max_len:]
121 return max_len
122
123 def truncate(self, pos=None):
124 return pos
125
Antoine Pitrou328ec742010-09-14 18:37:24 +0000126class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
127 pass
128
129class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
130 pass
131
132
133class MockRawIO(MockRawIOWithoutRead):
134
135 def read(self, n=None):
136 self._reads += 1
137 try:
138 return self._read_stack.pop(0)
139 except:
140 self._extraneous_reads += 1
141 return b""
142
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000143class CMockRawIO(MockRawIO, io.RawIOBase):
144 pass
145
146class PyMockRawIO(MockRawIO, pyio.RawIOBase):
147 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000148
Guido van Rossuma9e20242007-03-08 00:43:48 +0000149
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000150class MisbehavedRawIO(MockRawIO):
151 def write(self, b):
152 return super().write(b) * 2
153
154 def read(self, n=None):
155 return super().read(n) * 2
156
157 def seek(self, pos, whence):
158 return -123
159
160 def tell(self):
161 return -456
162
163 def readinto(self, buf):
164 super().readinto(buf)
165 return len(buf) * 5
166
167class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
168 pass
169
170class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
171 pass
172
173
174class CloseFailureIO(MockRawIO):
175 closed = 0
176
177 def close(self):
178 if not self.closed:
179 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200180 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181
182class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
183 pass
184
185class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
186 pass
187
188
189class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000190
191 def __init__(self, data):
192 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000193 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000194
195 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000197 self.read_history.append(None if res is None else len(res))
198 return res
199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000200 def readinto(self, b):
201 res = super().readinto(b)
202 self.read_history.append(res)
203 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000204
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000205class CMockFileIO(MockFileIO, io.BytesIO):
206 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000207
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000208class PyMockFileIO(MockFileIO, pyio.BytesIO):
209 pass
210
211
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000212class MockUnseekableIO:
213 def seekable(self):
214 return False
215
216 def seek(self, *args):
217 raise self.UnsupportedOperation("not seekable")
218
219 def tell(self, *args):
220 raise self.UnsupportedOperation("not seekable")
221
Martin Panter754aab22016-03-31 07:21:56 +0000222 def truncate(self, *args):
223 raise self.UnsupportedOperation("not seekable")
224
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000225class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
226 UnsupportedOperation = io.UnsupportedOperation
227
228class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
229 UnsupportedOperation = pyio.UnsupportedOperation
230
231
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000232class MockNonBlockWriterIO:
233
234 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000235 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000236 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000238 def pop_written(self):
239 s = b"".join(self._write_stack)
240 self._write_stack[:] = []
241 return s
242
243 def block_on(self, char):
244 """Block when a given char is encountered."""
245 self._blocker_char = char
246
247 def readable(self):
248 return True
249
250 def seekable(self):
251 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000252
Guido van Rossum01a27522007-03-07 01:00:12 +0000253 def writable(self):
254 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000255
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000256 def write(self, b):
257 b = bytes(b)
258 n = -1
259 if self._blocker_char:
260 try:
261 n = b.index(self._blocker_char)
262 except ValueError:
263 pass
264 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100265 if n > 0:
266 # write data up to the first blocker
267 self._write_stack.append(b[:n])
268 return n
269 else:
270 # cancel blocker and indicate would block
271 self._blocker_char = None
272 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000273 self._write_stack.append(b)
274 return len(b)
275
276class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
277 BlockingIOError = io.BlockingIOError
278
279class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
280 BlockingIOError = pyio.BlockingIOError
281
Guido van Rossuma9e20242007-03-08 00:43:48 +0000282
Guido van Rossum28524c72007-02-27 05:47:44 +0000283class IOTest(unittest.TestCase):
284
Neal Norwitze7789b12008-03-24 06:18:09 +0000285 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000286 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000287
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000288 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000289 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000290
Guido van Rossum28524c72007-02-27 05:47:44 +0000291 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000292 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293 f.truncate(0)
294 self.assertEqual(f.tell(), 5)
295 f.seek(0)
296
297 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000298 self.assertEqual(f.seek(0), 0)
299 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000300 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000301 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000302 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000303 buffer = bytearray(b" world\n\n\n")
304 self.assertEqual(f.write(buffer), 9)
305 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000306 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000307 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000308 self.assertEqual(f.seek(-1, 2), 13)
309 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000310
Guido van Rossum87429772007-04-10 21:06:59 +0000311 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000312 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000313 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000314
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 def read_ops(self, f, buffered=False):
316 data = f.read(5)
317 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000318 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000319 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000320 self.assertEqual(bytes(data), b" worl")
321 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000322 self.assertEqual(f.readinto(data), 2)
323 self.assertEqual(len(data), 5)
324 self.assertEqual(data[:2], b"d\n")
325 self.assertEqual(f.seek(0), 0)
326 self.assertEqual(f.read(20), b"hello world\n")
327 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000328 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000329 self.assertEqual(f.seek(-6, 2), 6)
330 self.assertEqual(f.read(5), b"world")
331 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000332 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000333 self.assertEqual(f.seek(-6, 1), 5)
334 self.assertEqual(f.read(5), b" worl")
335 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000336 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000337 if buffered:
338 f.seek(0)
339 self.assertEqual(f.read(), b"hello world\n")
340 f.seek(6)
341 self.assertEqual(f.read(), b"world\n")
342 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000343 f.seek(0)
344 data = byteslike(5)
345 self.assertEqual(f.readinto1(data), 5)
346 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000347
Guido van Rossum34d69e52007-04-10 20:08:41 +0000348 LARGE = 2**31
349
Guido van Rossum53807da2007-04-10 19:01:47 +0000350 def large_file_ops(self, f):
351 assert f.readable()
352 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100353 try:
354 self.assertEqual(f.seek(self.LARGE), self.LARGE)
355 except (OverflowError, ValueError):
356 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000357 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000358 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000359 self.assertEqual(f.tell(), self.LARGE + 3)
360 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000361 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000362 self.assertEqual(f.tell(), self.LARGE + 2)
363 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000364 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000365 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000366 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
367 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000368 self.assertEqual(f.read(2), b"x")
369
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000370 def test_invalid_operations(self):
371 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000372 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000373 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000374 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000375 self.assertRaises(exc, fp.read)
376 self.assertRaises(exc, fp.readline)
377 with self.open(support.TESTFN, "wb", buffering=0) as fp:
378 self.assertRaises(exc, fp.read)
379 self.assertRaises(exc, fp.readline)
380 with self.open(support.TESTFN, "rb", buffering=0) as fp:
381 self.assertRaises(exc, fp.write, b"blah")
382 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000383 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000384 self.assertRaises(exc, fp.write, b"blah")
385 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000386 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000387 self.assertRaises(exc, fp.write, "blah")
388 self.assertRaises(exc, fp.writelines, ["blah\n"])
389 # Non-zero seeking from current or end pos
390 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
391 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000392
Martin Panter754aab22016-03-31 07:21:56 +0000393 def test_optional_abilities(self):
394 # Test for OSError when optional APIs are not supported
395 # The purpose of this test is to try fileno(), reading, writing and
396 # seeking operations with various objects that indicate they do not
397 # support these operations.
398
399 def pipe_reader():
400 [r, w] = os.pipe()
401 os.close(w) # So that read() is harmless
402 return self.FileIO(r, "r")
403
404 def pipe_writer():
405 [r, w] = os.pipe()
406 self.addCleanup(os.close, r)
407 # Guarantee that we can write into the pipe without blocking
408 thread = threading.Thread(target=os.read, args=(r, 100))
409 thread.start()
410 self.addCleanup(thread.join)
411 return self.FileIO(w, "w")
412
413 def buffered_reader():
414 return self.BufferedReader(self.MockUnseekableIO())
415
416 def buffered_writer():
417 return self.BufferedWriter(self.MockUnseekableIO())
418
419 def buffered_random():
420 return self.BufferedRandom(self.BytesIO())
421
422 def buffered_rw_pair():
423 return self.BufferedRWPair(self.MockUnseekableIO(),
424 self.MockUnseekableIO())
425
426 def text_reader():
427 class UnseekableReader(self.MockUnseekableIO):
428 writable = self.BufferedIOBase.writable
429 write = self.BufferedIOBase.write
430 return self.TextIOWrapper(UnseekableReader(), "ascii")
431
432 def text_writer():
433 class UnseekableWriter(self.MockUnseekableIO):
434 readable = self.BufferedIOBase.readable
435 read = self.BufferedIOBase.read
436 return self.TextIOWrapper(UnseekableWriter(), "ascii")
437
438 tests = (
439 (pipe_reader, "fr"), (pipe_writer, "fw"),
440 (buffered_reader, "r"), (buffered_writer, "w"),
441 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
442 (text_reader, "r"), (text_writer, "w"),
443 (self.BytesIO, "rws"), (self.StringIO, "rws"),
444 )
445 for [test, abilities] in tests:
446 if test is pipe_writer and not threading:
447 continue # Skip subtest that uses a background thread
448 with self.subTest(test), test() as obj:
449 readable = "r" in abilities
450 self.assertEqual(obj.readable(), readable)
451 writable = "w" in abilities
452 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000453
454 if isinstance(obj, self.TextIOBase):
455 data = "3"
456 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
457 data = b"3"
458 else:
459 self.fail("Unknown base class")
460
461 if "f" in abilities:
462 obj.fileno()
463 else:
464 self.assertRaises(OSError, obj.fileno)
465
466 if readable:
467 obj.read(1)
468 obj.read()
469 else:
470 self.assertRaises(OSError, obj.read, 1)
471 self.assertRaises(OSError, obj.read)
472
473 if writable:
474 obj.write(data)
475 else:
476 self.assertRaises(OSError, obj.write, data)
477
Martin Panter3ee147f2016-03-31 21:05:31 +0000478 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000479 pipe_reader, pipe_writer):
480 # Pipes seem to appear as seekable on Windows
481 continue
482 seekable = "s" in abilities
483 self.assertEqual(obj.seekable(), seekable)
484
Martin Panter754aab22016-03-31 07:21:56 +0000485 if seekable:
486 obj.tell()
487 obj.seek(0)
488 else:
489 self.assertRaises(OSError, obj.tell)
490 self.assertRaises(OSError, obj.seek, 0)
491
492 if writable and seekable:
493 obj.truncate()
494 obj.truncate(0)
495 else:
496 self.assertRaises(OSError, obj.truncate)
497 self.assertRaises(OSError, obj.truncate, 0)
498
Antoine Pitrou13348842012-01-29 18:36:34 +0100499 def test_open_handles_NUL_chars(self):
500 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300501 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100502
503 bytes_fn = bytes(fn_with_NUL, 'ascii')
504 with warnings.catch_warnings():
505 warnings.simplefilter("ignore", DeprecationWarning)
506 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100507
Guido van Rossum28524c72007-02-27 05:47:44 +0000508 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000509 with self.open(support.TESTFN, "wb", buffering=0) as f:
510 self.assertEqual(f.readable(), False)
511 self.assertEqual(f.writable(), True)
512 self.assertEqual(f.seekable(), True)
513 self.write_ops(f)
514 with self.open(support.TESTFN, "rb", buffering=0) as f:
515 self.assertEqual(f.readable(), True)
516 self.assertEqual(f.writable(), False)
517 self.assertEqual(f.seekable(), True)
518 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000519
Guido van Rossum87429772007-04-10 21:06:59 +0000520 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000521 with self.open(support.TESTFN, "wb") as f:
522 self.assertEqual(f.readable(), False)
523 self.assertEqual(f.writable(), True)
524 self.assertEqual(f.seekable(), True)
525 self.write_ops(f)
526 with self.open(support.TESTFN, "rb") as f:
527 self.assertEqual(f.readable(), True)
528 self.assertEqual(f.writable(), False)
529 self.assertEqual(f.seekable(), True)
530 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000531
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000532 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000533 with self.open(support.TESTFN, "wb") as f:
534 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
535 with self.open(support.TESTFN, "rb") as f:
536 self.assertEqual(f.readline(), b"abc\n")
537 self.assertEqual(f.readline(10), b"def\n")
538 self.assertEqual(f.readline(2), b"xy")
539 self.assertEqual(f.readline(4), b"zzy\n")
540 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000541 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000542 self.assertRaises(TypeError, f.readline, 5.3)
543 with self.open(support.TESTFN, "r") as f:
544 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000545
Guido van Rossum28524c72007-02-27 05:47:44 +0000546 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000547 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000548 self.write_ops(f)
549 data = f.getvalue()
550 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000551 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000552 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000553
Guido van Rossum53807da2007-04-10 19:01:47 +0000554 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000555 # On Windows and Mac OSX this test comsumes large resources; It takes
556 # a long time to build the >2GB file and takes >2GB of disk space
557 # therefore the resource must be enabled to run this test.
558 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600559 support.requires(
560 'largefile',
561 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000562 with self.open(support.TESTFN, "w+b", 0) as f:
563 self.large_file_ops(f)
564 with self.open(support.TESTFN, "w+b") as f:
565 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000566
567 def test_with_open(self):
568 for bufsize in (0, 1, 100):
569 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000570 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000571 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000572 self.assertEqual(f.closed, True)
573 f = None
574 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000575 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000576 1/0
577 except ZeroDivisionError:
578 self.assertEqual(f.closed, True)
579 else:
580 self.fail("1/0 didn't raise an exception")
581
Antoine Pitrou08838b62009-01-21 00:55:13 +0000582 # issue 5008
583 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000584 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000585 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000586 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000587 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000588 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000589 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000590 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300591 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000592
Guido van Rossum87429772007-04-10 21:06:59 +0000593 def test_destructor(self):
594 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000595 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000596 def __del__(self):
597 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000598 try:
599 f = super().__del__
600 except AttributeError:
601 pass
602 else:
603 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000604 def close(self):
605 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000606 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000607 def flush(self):
608 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000609 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000610 with support.check_warnings(('', ResourceWarning)):
611 f = MyFileIO(support.TESTFN, "wb")
612 f.write(b"xxx")
613 del f
614 support.gc_collect()
615 self.assertEqual(record, [1, 2, 3])
616 with self.open(support.TESTFN, "rb") as f:
617 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000618
619 def _check_base_destructor(self, base):
620 record = []
621 class MyIO(base):
622 def __init__(self):
623 # This exercises the availability of attributes on object
624 # destruction.
625 # (in the C version, close() is called by the tp_dealloc
626 # function, not by __del__)
627 self.on_del = 1
628 self.on_close = 2
629 self.on_flush = 3
630 def __del__(self):
631 record.append(self.on_del)
632 try:
633 f = super().__del__
634 except AttributeError:
635 pass
636 else:
637 f()
638 def close(self):
639 record.append(self.on_close)
640 super().close()
641 def flush(self):
642 record.append(self.on_flush)
643 super().flush()
644 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000645 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000646 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000647 self.assertEqual(record, [1, 2, 3])
648
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000649 def test_IOBase_destructor(self):
650 self._check_base_destructor(self.IOBase)
651
652 def test_RawIOBase_destructor(self):
653 self._check_base_destructor(self.RawIOBase)
654
655 def test_BufferedIOBase_destructor(self):
656 self._check_base_destructor(self.BufferedIOBase)
657
658 def test_TextIOBase_destructor(self):
659 self._check_base_destructor(self.TextIOBase)
660
Guido van Rossum87429772007-04-10 21:06:59 +0000661 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000662 with self.open(support.TESTFN, "wb") as f:
663 f.write(b"xxx")
664 with self.open(support.TESTFN, "rb") as f:
665 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000666
Guido van Rossumd4103952007-04-12 05:44:49 +0000667 def test_array_writes(self):
668 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000669 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000670 def check(f):
671 with f:
672 self.assertEqual(f.write(a), n)
673 f.writelines((a,))
674 check(self.BytesIO())
675 check(self.FileIO(support.TESTFN, "w"))
676 check(self.BufferedWriter(self.MockRawIO()))
677 check(self.BufferedRandom(self.MockRawIO()))
678 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000679
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000680 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000681 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000682 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000683
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000684 def test_read_closed(self):
685 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000686 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000687 with self.open(support.TESTFN, "r") as f:
688 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000689 self.assertEqual(file.read(), "egg\n")
690 file.seek(0)
691 file.close()
692 self.assertRaises(ValueError, file.read)
693
694 def test_no_closefd_with_filename(self):
695 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000696 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000697
698 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000699 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000700 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000701 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000702 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000703 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000704 self.assertEqual(file.buffer.raw.closefd, False)
705
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000706 def test_garbage_collection(self):
707 # FileIO objects are collected, and collecting them flushes
708 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000709 with support.check_warnings(('', ResourceWarning)):
710 f = self.FileIO(support.TESTFN, "wb")
711 f.write(b"abcxxx")
712 f.f = f
713 wr = weakref.ref(f)
714 del f
715 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300716 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000717 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000718 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000719
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000720 def test_unbounded_file(self):
721 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
722 zero = "/dev/zero"
723 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000724 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000725 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000726 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000727 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000728 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000729 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000730 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000731 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000732 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000733 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000734 self.assertRaises(OverflowError, f.read)
735
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200736 def check_flush_error_on_close(self, *args, **kwargs):
737 # Test that the file is closed despite failed flush
738 # and that flush() is called before file closed.
739 f = self.open(*args, **kwargs)
740 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000741 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200742 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200743 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000744 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200745 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600746 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200747 self.assertTrue(closed) # flush() called
748 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200749 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200750
751 def test_flush_error_on_close(self):
752 # raw file
753 # Issue #5700: io.FileIO calls flush() after file closed
754 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
755 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
756 self.check_flush_error_on_close(fd, 'wb', buffering=0)
757 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
758 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
759 os.close(fd)
760 # buffered io
761 self.check_flush_error_on_close(support.TESTFN, 'wb')
762 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
763 self.check_flush_error_on_close(fd, 'wb')
764 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
765 self.check_flush_error_on_close(fd, 'wb', closefd=False)
766 os.close(fd)
767 # text io
768 self.check_flush_error_on_close(support.TESTFN, 'w')
769 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
770 self.check_flush_error_on_close(fd, 'w')
771 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
772 self.check_flush_error_on_close(fd, 'w', closefd=False)
773 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000774
775 def test_multi_close(self):
776 f = self.open(support.TESTFN, "wb", buffering=0)
777 f.close()
778 f.close()
779 f.close()
780 self.assertRaises(ValueError, f.flush)
781
Antoine Pitrou328ec742010-09-14 18:37:24 +0000782 def test_RawIOBase_read(self):
783 # Exercise the default RawIOBase.read() implementation (which calls
784 # readinto() internally).
785 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
786 self.assertEqual(rawio.read(2), b"ab")
787 self.assertEqual(rawio.read(2), b"c")
788 self.assertEqual(rawio.read(2), b"d")
789 self.assertEqual(rawio.read(2), None)
790 self.assertEqual(rawio.read(2), b"ef")
791 self.assertEqual(rawio.read(2), b"g")
792 self.assertEqual(rawio.read(2), None)
793 self.assertEqual(rawio.read(2), b"")
794
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400795 def test_types_have_dict(self):
796 test = (
797 self.IOBase(),
798 self.RawIOBase(),
799 self.TextIOBase(),
800 self.StringIO(),
801 self.BytesIO()
802 )
803 for obj in test:
804 self.assertTrue(hasattr(obj, "__dict__"))
805
Ross Lagerwall59142db2011-10-31 20:34:46 +0200806 def test_opener(self):
807 with self.open(support.TESTFN, "w") as f:
808 f.write("egg\n")
809 fd = os.open(support.TESTFN, os.O_RDONLY)
810 def opener(path, flags):
811 return fd
812 with self.open("non-existent", "r", opener=opener) as f:
813 self.assertEqual(f.read(), "egg\n")
814
Barry Warsaw480e2852016-06-08 17:47:26 -0400815 def test_bad_opener_negative_1(self):
816 # Issue #27066.
817 def badopener(fname, flags):
818 return -1
819 with self.assertRaises(ValueError) as cm:
820 open('non-existent', 'r', opener=badopener)
821 self.assertEqual(str(cm.exception), 'opener returned -1')
822
823 def test_bad_opener_other_negative(self):
824 # Issue #27066.
825 def badopener(fname, flags):
826 return -2
827 with self.assertRaises(ValueError) as cm:
828 open('non-existent', 'r', opener=badopener)
829 self.assertEqual(str(cm.exception), 'opener returned -2')
830
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200831 def test_fileio_closefd(self):
832 # Issue #4841
833 with self.open(__file__, 'rb') as f1, \
834 self.open(__file__, 'rb') as f2:
835 fileio = self.FileIO(f1.fileno(), closefd=False)
836 # .__init__() must not close f1
837 fileio.__init__(f2.fileno(), closefd=False)
838 f1.readline()
839 # .close() must not close f2
840 fileio.close()
841 f2.readline()
842
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300843 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200844 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300845 with self.assertRaises(ValueError):
846 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300847
848 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200849 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300850 with self.assertRaises(ValueError):
851 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300852
Martin Panter6bb91f32016-05-28 00:41:57 +0000853 def test_buffered_readinto_mixin(self):
854 # Test the implementation provided by BufferedIOBase
855 class Stream(self.BufferedIOBase):
856 def read(self, size):
857 return b"12345"
858 read1 = read
859 stream = Stream()
860 for method in ("readinto", "readinto1"):
861 with self.subTest(method):
862 buffer = byteslike(5)
863 self.assertEqual(getattr(stream, method)(buffer), 5)
864 self.assertEqual(bytes(buffer), b"12345")
865
Ethan Furmand62548a2016-06-04 14:38:43 -0700866 def test_fspath_support(self):
867 class PathLike:
868 def __init__(self, path):
869 self.path = path
870
871 def __fspath__(self):
872 return self.path
873
874 def check_path_succeeds(path):
875 with self.open(path, "w") as f:
876 f.write("egg\n")
877
878 with self.open(path, "r") as f:
879 self.assertEqual(f.read(), "egg\n")
880
881 check_path_succeeds(PathLike(support.TESTFN))
882 check_path_succeeds(PathLike(support.TESTFN.encode('utf-8')))
883
884 bad_path = PathLike(TypeError)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700885 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700886 self.open(bad_path, 'w')
887
888 # ensure that refcounting is correct with some error conditions
889 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
890 self.open(PathLike(support.TESTFN), 'rwxa')
891
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200892
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000893class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200894
895 def test_IOBase_finalize(self):
896 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
897 # class which inherits IOBase and an object of this class are caught
898 # in a reference cycle and close() is already in the method cache.
899 class MyIO(self.IOBase):
900 def close(self):
901 pass
902
903 # create an instance to populate the method cache
904 MyIO()
905 obj = MyIO()
906 obj.obj = obj
907 wr = weakref.ref(obj)
908 del MyIO
909 del obj
910 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300911 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000912
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000913class PyIOTest(IOTest):
914 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000915
Guido van Rossuma9e20242007-03-08 00:43:48 +0000916
Gregory P. Smith1bef9072015-04-14 13:24:34 -0700917@support.cpython_only
918class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -0700919
Gregory P. Smith054b0652015-04-14 12:58:05 -0700920 def test_RawIOBase_io_in_pyio_match(self):
921 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +0200922 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
923 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -0700924 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
925
926 def test_RawIOBase_pyio_in_io_match(self):
927 """Test that c RawIOBase class has all pyio RawIOBase methods"""
928 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
929 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
930
931
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000932class CommonBufferedTests:
933 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
934
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000935 def test_detach(self):
936 raw = self.MockRawIO()
937 buf = self.tp(raw)
938 self.assertIs(buf.detach(), raw)
939 self.assertRaises(ValueError, buf.detach)
940
Benjamin Peterson10e76b62014-12-21 20:51:50 -0600941 repr(buf) # Should still work
942
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000943 def test_fileno(self):
944 rawio = self.MockRawIO()
945 bufio = self.tp(rawio)
946
Ezio Melottib3aedd42010-11-20 19:04:17 +0000947 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000948
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000949 def test_invalid_args(self):
950 rawio = self.MockRawIO()
951 bufio = self.tp(rawio)
952 # Invalid whence
953 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200954 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000955
956 def test_override_destructor(self):
957 tp = self.tp
958 record = []
959 class MyBufferedIO(tp):
960 def __del__(self):
961 record.append(1)
962 try:
963 f = super().__del__
964 except AttributeError:
965 pass
966 else:
967 f()
968 def close(self):
969 record.append(2)
970 super().close()
971 def flush(self):
972 record.append(3)
973 super().flush()
974 rawio = self.MockRawIO()
975 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000976 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000977 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +0000978 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000979
980 def test_context_manager(self):
981 # Test usability as a context manager
982 rawio = self.MockRawIO()
983 bufio = self.tp(rawio)
984 def _with():
985 with bufio:
986 pass
987 _with()
988 # bufio should now be closed, and using it a second time should raise
989 # a ValueError.
990 self.assertRaises(ValueError, _with)
991
992 def test_error_through_destructor(self):
993 # Test that the exception state is not modified by a destructor,
994 # even if close() fails.
995 rawio = self.CloseFailureIO()
996 def f():
997 self.tp(rawio).xyzzy
998 with support.captured_output("stderr") as s:
999 self.assertRaises(AttributeError, f)
1000 s = s.getvalue().strip()
1001 if s:
1002 # The destructor *may* have printed an unraisable error, check it
1003 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001004 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001005 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +00001006
Antoine Pitrou716c4442009-05-23 19:04:03 +00001007 def test_repr(self):
1008 raw = self.MockRawIO()
1009 b = self.tp(raw)
Serhiy Storchaka521e5862014-07-22 15:00:37 +03001010 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001011 self.assertEqual(repr(b), "<%s>" % clsname)
1012 raw.name = "dummy"
1013 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
1014 raw.name = b"dummy"
1015 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
1016
Antoine Pitrou6be88762010-05-03 16:48:20 +00001017 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001018 # Test that buffered file is closed despite failed flush
1019 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001020 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001021 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001022 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001023 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001024 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001025 raw.flush = bad_flush
1026 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001027 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001028 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001029 self.assertTrue(raw.closed)
1030 self.assertTrue(closed) # flush() called
1031 self.assertFalse(closed[0]) # flush() called before file closed
1032 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001033 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001034
1035 def test_close_error_on_close(self):
1036 raw = self.MockRawIO()
1037 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001038 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001039 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001040 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001041 raw.close = bad_close
1042 b = self.tp(raw)
1043 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001044 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001045 b.close()
1046 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001047 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001048 self.assertEqual(err.exception.__context__.args, ('flush',))
1049 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001050
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001051 def test_nonnormalized_close_error_on_close(self):
1052 # Issue #21677
1053 raw = self.MockRawIO()
1054 def bad_flush():
1055 raise non_existing_flush
1056 def bad_close():
1057 raise non_existing_close
1058 raw.close = bad_close
1059 b = self.tp(raw)
1060 b.flush = bad_flush
1061 with self.assertRaises(NameError) as err: # exception not swallowed
1062 b.close()
1063 self.assertIn('non_existing_close', str(err.exception))
1064 self.assertIsInstance(err.exception.__context__, NameError)
1065 self.assertIn('non_existing_flush', str(err.exception.__context__))
1066 self.assertFalse(b.closed)
1067
Antoine Pitrou6be88762010-05-03 16:48:20 +00001068 def test_multi_close(self):
1069 raw = self.MockRawIO()
1070 b = self.tp(raw)
1071 b.close()
1072 b.close()
1073 b.close()
1074 self.assertRaises(ValueError, b.flush)
1075
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001076 def test_unseekable(self):
1077 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1078 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1079 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1080
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001081 def test_readonly_attributes(self):
1082 raw = self.MockRawIO()
1083 buf = self.tp(raw)
1084 x = self.MockRawIO()
1085 with self.assertRaises(AttributeError):
1086 buf.raw = x
1087
Guido van Rossum78892e42007-04-06 17:31:18 +00001088
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001089class SizeofTest:
1090
1091 @support.cpython_only
1092 def test_sizeof(self):
1093 bufsize1 = 4096
1094 bufsize2 = 8192
1095 rawio = self.MockRawIO()
1096 bufio = self.tp(rawio, buffer_size=bufsize1)
1097 size = sys.getsizeof(bufio) - bufsize1
1098 rawio = self.MockRawIO()
1099 bufio = self.tp(rawio, buffer_size=bufsize2)
1100 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1101
Jesus Ceadc469452012-10-04 12:37:56 +02001102 @support.cpython_only
1103 def test_buffer_freeing(self) :
1104 bufsize = 4096
1105 rawio = self.MockRawIO()
1106 bufio = self.tp(rawio, buffer_size=bufsize)
1107 size = sys.getsizeof(bufio) - bufsize
1108 bufio.close()
1109 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001110
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001111class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1112 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001113
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001114 def test_constructor(self):
1115 rawio = self.MockRawIO([b"abc"])
1116 bufio = self.tp(rawio)
1117 bufio.__init__(rawio)
1118 bufio.__init__(rawio, buffer_size=1024)
1119 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001120 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001121 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1122 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1123 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1124 rawio = self.MockRawIO([b"abc"])
1125 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001126 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001127
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001128 def test_uninitialized(self):
1129 bufio = self.tp.__new__(self.tp)
1130 del bufio
1131 bufio = self.tp.__new__(self.tp)
1132 self.assertRaisesRegex((ValueError, AttributeError),
1133 'uninitialized|has no attribute',
1134 bufio.read, 0)
1135 bufio.__init__(self.MockRawIO())
1136 self.assertEqual(bufio.read(0), b'')
1137
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001138 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001139 for arg in (None, 7):
1140 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1141 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001142 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001143 # Invalid args
1144 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001145
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001146 def test_read1(self):
1147 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1148 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001149 self.assertEqual(b"a", bufio.read(1))
1150 self.assertEqual(b"b", bufio.read1(1))
1151 self.assertEqual(rawio._reads, 1)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001152 self.assertEqual(b"", bufio.read1(0))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001153 self.assertEqual(b"c", bufio.read1(100))
1154 self.assertEqual(rawio._reads, 1)
1155 self.assertEqual(b"d", bufio.read1(100))
1156 self.assertEqual(rawio._reads, 2)
1157 self.assertEqual(b"efg", bufio.read1(100))
1158 self.assertEqual(rawio._reads, 3)
1159 self.assertEqual(b"", bufio.read1(100))
1160 self.assertEqual(rawio._reads, 4)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001161
1162 def test_read1_arbitrary(self):
1163 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1164 bufio = self.tp(rawio)
1165 self.assertEqual(b"a", bufio.read(1))
1166 self.assertEqual(b"bc", bufio.read1())
1167 self.assertEqual(b"d", bufio.read1())
1168 self.assertEqual(b"efg", bufio.read1(-1))
1169 self.assertEqual(rawio._reads, 3)
1170 self.assertEqual(b"", bufio.read1())
1171 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001172
1173 def test_readinto(self):
1174 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1175 bufio = self.tp(rawio)
1176 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001177 self.assertEqual(bufio.readinto(b), 2)
1178 self.assertEqual(b, b"ab")
1179 self.assertEqual(bufio.readinto(b), 2)
1180 self.assertEqual(b, b"cd")
1181 self.assertEqual(bufio.readinto(b), 2)
1182 self.assertEqual(b, b"ef")
1183 self.assertEqual(bufio.readinto(b), 1)
1184 self.assertEqual(b, b"gf")
1185 self.assertEqual(bufio.readinto(b), 0)
1186 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001187 rawio = self.MockRawIO((b"abc", None))
1188 bufio = self.tp(rawio)
1189 self.assertEqual(bufio.readinto(b), 2)
1190 self.assertEqual(b, b"ab")
1191 self.assertEqual(bufio.readinto(b), 1)
1192 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001193
Benjamin Petersona96fea02014-06-22 14:17:44 -07001194 def test_readinto1(self):
1195 buffer_size = 10
1196 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1197 bufio = self.tp(rawio, buffer_size=buffer_size)
1198 b = bytearray(2)
1199 self.assertEqual(bufio.peek(3), b'abc')
1200 self.assertEqual(rawio._reads, 1)
1201 self.assertEqual(bufio.readinto1(b), 2)
1202 self.assertEqual(b, b"ab")
1203 self.assertEqual(rawio._reads, 1)
1204 self.assertEqual(bufio.readinto1(b), 1)
1205 self.assertEqual(b[:1], b"c")
1206 self.assertEqual(rawio._reads, 1)
1207 self.assertEqual(bufio.readinto1(b), 2)
1208 self.assertEqual(b, b"de")
1209 self.assertEqual(rawio._reads, 2)
1210 b = bytearray(2*buffer_size)
1211 self.assertEqual(bufio.peek(3), b'fgh')
1212 self.assertEqual(rawio._reads, 3)
1213 self.assertEqual(bufio.readinto1(b), 6)
1214 self.assertEqual(b[:6], b"fghjkl")
1215 self.assertEqual(rawio._reads, 4)
1216
1217 def test_readinto_array(self):
1218 buffer_size = 60
1219 data = b"a" * 26
1220 rawio = self.MockRawIO((data,))
1221 bufio = self.tp(rawio, buffer_size=buffer_size)
1222
1223 # Create an array with element size > 1 byte
1224 b = array.array('i', b'x' * 32)
1225 assert len(b) != 16
1226
1227 # Read into it. We should get as many *bytes* as we can fit into b
1228 # (which is more than the number of elements)
1229 n = bufio.readinto(b)
1230 self.assertGreater(n, len(b))
1231
1232 # Check that old contents of b are preserved
1233 bm = memoryview(b).cast('B')
1234 self.assertLess(n, len(bm))
1235 self.assertEqual(bm[:n], data[:n])
1236 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1237
1238 def test_readinto1_array(self):
1239 buffer_size = 60
1240 data = b"a" * 26
1241 rawio = self.MockRawIO((data,))
1242 bufio = self.tp(rawio, buffer_size=buffer_size)
1243
1244 # Create an array with element size > 1 byte
1245 b = array.array('i', b'x' * 32)
1246 assert len(b) != 16
1247
1248 # Read into it. We should get as many *bytes* as we can fit into b
1249 # (which is more than the number of elements)
1250 n = bufio.readinto1(b)
1251 self.assertGreater(n, len(b))
1252
1253 # Check that old contents of b are preserved
1254 bm = memoryview(b).cast('B')
1255 self.assertLess(n, len(bm))
1256 self.assertEqual(bm[:n], data[:n])
1257 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1258
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001259 def test_readlines(self):
1260 def bufio():
1261 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1262 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001263 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1264 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1265 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001266
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001267 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001268 data = b"abcdefghi"
1269 dlen = len(data)
1270
1271 tests = [
1272 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1273 [ 100, [ 3, 3, 3], [ dlen ] ],
1274 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1275 ]
1276
1277 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001278 rawio = self.MockFileIO(data)
1279 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001280 pos = 0
1281 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001282 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001283 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001284 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001285 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001286
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001287 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001288 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001289 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1290 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001291 self.assertEqual(b"abcd", bufio.read(6))
1292 self.assertEqual(b"e", bufio.read(1))
1293 self.assertEqual(b"fg", bufio.read())
1294 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001295 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001296 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001297
Victor Stinnera80987f2011-05-25 22:47:16 +02001298 rawio = self.MockRawIO((b"a", None, None))
1299 self.assertEqual(b"a", rawio.readall())
1300 self.assertIsNone(rawio.readall())
1301
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001302 def test_read_past_eof(self):
1303 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1304 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001305
Ezio Melottib3aedd42010-11-20 19:04:17 +00001306 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001307
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001308 def test_read_all(self):
1309 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1310 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001311
Ezio Melottib3aedd42010-11-20 19:04:17 +00001312 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001313
Victor Stinner45df8202010-04-28 22:31:17 +00001314 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001315 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001316 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001317 try:
1318 # Write out many bytes with exactly the same number of 0's,
1319 # 1's... 255's. This will help us check that concurrent reading
1320 # doesn't duplicate or forget contents.
1321 N = 1000
1322 l = list(range(256)) * N
1323 random.shuffle(l)
1324 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001325 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001326 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001327 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001328 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001329 errors = []
1330 results = []
1331 def f():
1332 try:
1333 # Intra-buffer read then buffer-flushing read
1334 for n in cycle([1, 19]):
1335 s = bufio.read(n)
1336 if not s:
1337 break
1338 # list.append() is atomic
1339 results.append(s)
1340 except Exception as e:
1341 errors.append(e)
1342 raise
1343 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001344 with support.start_threads(threads):
1345 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001346 self.assertFalse(errors,
1347 "the following exceptions were caught: %r" % errors)
1348 s = b''.join(results)
1349 for i in range(256):
1350 c = bytes(bytearray([i]))
1351 self.assertEqual(s.count(c), N)
1352 finally:
1353 support.unlink(support.TESTFN)
1354
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001355 def test_unseekable(self):
1356 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1357 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1358 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1359 bufio.read(1)
1360 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1361 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1362
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001363 def test_misbehaved_io(self):
1364 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1365 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001366 self.assertRaises(OSError, bufio.seek, 0)
1367 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001368
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001369 def test_no_extraneous_read(self):
1370 # Issue #9550; when the raw IO object has satisfied the read request,
1371 # we should not issue any additional reads, otherwise it may block
1372 # (e.g. socket).
1373 bufsize = 16
1374 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1375 rawio = self.MockRawIO([b"x" * n])
1376 bufio = self.tp(rawio, bufsize)
1377 self.assertEqual(bufio.read(n), b"x" * n)
1378 # Simple case: one raw read is enough to satisfy the request.
1379 self.assertEqual(rawio._extraneous_reads, 0,
1380 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1381 # A more complex case where two raw reads are needed to satisfy
1382 # the request.
1383 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1384 bufio = self.tp(rawio, bufsize)
1385 self.assertEqual(bufio.read(n), b"x" * n)
1386 self.assertEqual(rawio._extraneous_reads, 0,
1387 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1388
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001389 def test_read_on_closed(self):
1390 # Issue #23796
1391 b = io.BufferedReader(io.BytesIO(b"12"))
1392 b.read(1)
1393 b.close()
1394 self.assertRaises(ValueError, b.peek)
1395 self.assertRaises(ValueError, b.read1, 1)
1396
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001397
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001398class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001399 tp = io.BufferedReader
1400
1401 def test_constructor(self):
1402 BufferedReaderTest.test_constructor(self)
1403 # The allocation can succeed on 32-bit builds, e.g. with more
1404 # than 2GB RAM and a 64-bit kernel.
1405 if sys.maxsize > 0x7FFFFFFF:
1406 rawio = self.MockRawIO()
1407 bufio = self.tp(rawio)
1408 self.assertRaises((OverflowError, MemoryError, ValueError),
1409 bufio.__init__, rawio, sys.maxsize)
1410
1411 def test_initialization(self):
1412 rawio = self.MockRawIO([b"abc"])
1413 bufio = self.tp(rawio)
1414 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1415 self.assertRaises(ValueError, bufio.read)
1416 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1417 self.assertRaises(ValueError, bufio.read)
1418 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1419 self.assertRaises(ValueError, bufio.read)
1420
1421 def test_misbehaved_io_read(self):
1422 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1423 bufio = self.tp(rawio)
1424 # _pyio.BufferedReader seems to implement reading different, so that
1425 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001426 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001427
1428 def test_garbage_collection(self):
1429 # C BufferedReader objects are collected.
1430 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001431 with support.check_warnings(('', ResourceWarning)):
1432 rawio = self.FileIO(support.TESTFN, "w+b")
1433 f = self.tp(rawio)
1434 f.f = f
1435 wr = weakref.ref(f)
1436 del f
1437 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001438 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001439
R David Murray67bfe802013-02-23 21:51:05 -05001440 def test_args_error(self):
1441 # Issue #17275
1442 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1443 self.tp(io.BytesIO(), 1024, 1024, 1024)
1444
1445
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001446class PyBufferedReaderTest(BufferedReaderTest):
1447 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001448
Guido van Rossuma9e20242007-03-08 00:43:48 +00001449
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001450class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1451 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001452
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001453 def test_constructor(self):
1454 rawio = self.MockRawIO()
1455 bufio = self.tp(rawio)
1456 bufio.__init__(rawio)
1457 bufio.__init__(rawio, buffer_size=1024)
1458 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001459 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001460 bufio.flush()
1461 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1462 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1463 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1464 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001465 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001466 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001467 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001468
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001469 def test_uninitialized(self):
1470 bufio = self.tp.__new__(self.tp)
1471 del bufio
1472 bufio = self.tp.__new__(self.tp)
1473 self.assertRaisesRegex((ValueError, AttributeError),
1474 'uninitialized|has no attribute',
1475 bufio.write, b'')
1476 bufio.__init__(self.MockRawIO())
1477 self.assertEqual(bufio.write(b''), 0)
1478
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001479 def test_detach_flush(self):
1480 raw = self.MockRawIO()
1481 buf = self.tp(raw)
1482 buf.write(b"howdy!")
1483 self.assertFalse(raw._write_stack)
1484 buf.detach()
1485 self.assertEqual(raw._write_stack, [b"howdy!"])
1486
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001487 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001488 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001489 writer = self.MockRawIO()
1490 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001491 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001492 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001493 buffer = bytearray(b"def")
1494 bufio.write(buffer)
1495 buffer[:] = b"***" # Overwrite our copy of the data
1496 bufio.flush()
1497 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001498
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001499 def test_write_overflow(self):
1500 writer = self.MockRawIO()
1501 bufio = self.tp(writer, 8)
1502 contents = b"abcdefghijklmnop"
1503 for n in range(0, len(contents), 3):
1504 bufio.write(contents[n:n+3])
1505 flushed = b"".join(writer._write_stack)
1506 # At least (total - 8) bytes were implicitly flushed, perhaps more
1507 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001508 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001509
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001510 def check_writes(self, intermediate_func):
1511 # Lots of writes, test the flushed output is as expected.
1512 contents = bytes(range(256)) * 1000
1513 n = 0
1514 writer = self.MockRawIO()
1515 bufio = self.tp(writer, 13)
1516 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1517 def gen_sizes():
1518 for size in count(1):
1519 for i in range(15):
1520 yield size
1521 sizes = gen_sizes()
1522 while n < len(contents):
1523 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001524 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001525 intermediate_func(bufio)
1526 n += size
1527 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001528 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001529
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001530 def test_writes(self):
1531 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001532
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001533 def test_writes_and_flushes(self):
1534 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001535
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001536 def test_writes_and_seeks(self):
1537 def _seekabs(bufio):
1538 pos = bufio.tell()
1539 bufio.seek(pos + 1, 0)
1540 bufio.seek(pos - 1, 0)
1541 bufio.seek(pos, 0)
1542 self.check_writes(_seekabs)
1543 def _seekrel(bufio):
1544 pos = bufio.seek(0, 1)
1545 bufio.seek(+1, 1)
1546 bufio.seek(-1, 1)
1547 bufio.seek(pos, 0)
1548 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001549
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001550 def test_writes_and_truncates(self):
1551 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001552
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001553 def test_write_non_blocking(self):
1554 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001555 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001556
Ezio Melottib3aedd42010-11-20 19:04:17 +00001557 self.assertEqual(bufio.write(b"abcd"), 4)
1558 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001559 # 1 byte will be written, the rest will be buffered
1560 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001561 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001562
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001563 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1564 raw.block_on(b"0")
1565 try:
1566 bufio.write(b"opqrwxyz0123456789")
1567 except self.BlockingIOError as e:
1568 written = e.characters_written
1569 else:
1570 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001571 self.assertEqual(written, 16)
1572 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001573 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001574
Ezio Melottib3aedd42010-11-20 19:04:17 +00001575 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001576 s = raw.pop_written()
1577 # Previously buffered bytes were flushed
1578 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001579
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001580 def test_write_and_rewind(self):
1581 raw = io.BytesIO()
1582 bufio = self.tp(raw, 4)
1583 self.assertEqual(bufio.write(b"abcdef"), 6)
1584 self.assertEqual(bufio.tell(), 6)
1585 bufio.seek(0, 0)
1586 self.assertEqual(bufio.write(b"XY"), 2)
1587 bufio.seek(6, 0)
1588 self.assertEqual(raw.getvalue(), b"XYcdef")
1589 self.assertEqual(bufio.write(b"123456"), 6)
1590 bufio.flush()
1591 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001592
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001593 def test_flush(self):
1594 writer = self.MockRawIO()
1595 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001596 bufio.write(b"abc")
1597 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001598 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001599
Antoine Pitrou131a4892012-10-16 22:57:11 +02001600 def test_writelines(self):
1601 l = [b'ab', b'cd', b'ef']
1602 writer = self.MockRawIO()
1603 bufio = self.tp(writer, 8)
1604 bufio.writelines(l)
1605 bufio.flush()
1606 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1607
1608 def test_writelines_userlist(self):
1609 l = UserList([b'ab', b'cd', b'ef'])
1610 writer = self.MockRawIO()
1611 bufio = self.tp(writer, 8)
1612 bufio.writelines(l)
1613 bufio.flush()
1614 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1615
1616 def test_writelines_error(self):
1617 writer = self.MockRawIO()
1618 bufio = self.tp(writer, 8)
1619 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1620 self.assertRaises(TypeError, bufio.writelines, None)
1621 self.assertRaises(TypeError, bufio.writelines, 'abc')
1622
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001623 def test_destructor(self):
1624 writer = self.MockRawIO()
1625 bufio = self.tp(writer, 8)
1626 bufio.write(b"abc")
1627 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001628 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001629 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001630
1631 def test_truncate(self):
1632 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001633 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001634 bufio = self.tp(raw, 8)
1635 bufio.write(b"abcdef")
1636 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001637 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001638 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001639 self.assertEqual(f.read(), b"abc")
1640
Victor Stinner45df8202010-04-28 22:31:17 +00001641 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001642 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001643 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001644 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001645 # Write out many bytes from many threads and test they were
1646 # all flushed.
1647 N = 1000
1648 contents = bytes(range(256)) * N
1649 sizes = cycle([1, 19])
1650 n = 0
1651 queue = deque()
1652 while n < len(contents):
1653 size = next(sizes)
1654 queue.append(contents[n:n+size])
1655 n += size
1656 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001657 # We use a real file object because it allows us to
1658 # exercise situations where the GIL is released before
1659 # writing the buffer to the raw streams. This is in addition
1660 # to concurrency issues due to switching threads in the middle
1661 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001662 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001663 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001664 errors = []
1665 def f():
1666 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001667 while True:
1668 try:
1669 s = queue.popleft()
1670 except IndexError:
1671 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001672 bufio.write(s)
1673 except Exception as e:
1674 errors.append(e)
1675 raise
1676 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001677 with support.start_threads(threads):
1678 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001679 self.assertFalse(errors,
1680 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001681 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001682 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001683 s = f.read()
1684 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001685 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001686 finally:
1687 support.unlink(support.TESTFN)
1688
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001689 def test_misbehaved_io(self):
1690 rawio = self.MisbehavedRawIO()
1691 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001692 self.assertRaises(OSError, bufio.seek, 0)
1693 self.assertRaises(OSError, bufio.tell)
1694 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001695
Florent Xicluna109d5732012-07-07 17:03:22 +02001696 def test_max_buffer_size_removal(self):
1697 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001698 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001699
Benjamin Peterson68623612012-12-20 11:53:11 -06001700 def test_write_error_on_close(self):
1701 raw = self.MockRawIO()
1702 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001703 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001704 raw.write = bad_write
1705 b = self.tp(raw)
1706 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001707 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001708 self.assertTrue(b.closed)
1709
Benjamin Peterson59406a92009-03-26 17:10:29 +00001710
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001711class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001712 tp = io.BufferedWriter
1713
1714 def test_constructor(self):
1715 BufferedWriterTest.test_constructor(self)
1716 # The allocation can succeed on 32-bit builds, e.g. with more
1717 # than 2GB RAM and a 64-bit kernel.
1718 if sys.maxsize > 0x7FFFFFFF:
1719 rawio = self.MockRawIO()
1720 bufio = self.tp(rawio)
1721 self.assertRaises((OverflowError, MemoryError, ValueError),
1722 bufio.__init__, rawio, sys.maxsize)
1723
1724 def test_initialization(self):
1725 rawio = self.MockRawIO()
1726 bufio = self.tp(rawio)
1727 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1728 self.assertRaises(ValueError, bufio.write, b"def")
1729 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1730 self.assertRaises(ValueError, bufio.write, b"def")
1731 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1732 self.assertRaises(ValueError, bufio.write, b"def")
1733
1734 def test_garbage_collection(self):
1735 # C BufferedWriter objects are collected, and collecting them flushes
1736 # all data to disk.
1737 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001738 with support.check_warnings(('', ResourceWarning)):
1739 rawio = self.FileIO(support.TESTFN, "w+b")
1740 f = self.tp(rawio)
1741 f.write(b"123xxx")
1742 f.x = f
1743 wr = weakref.ref(f)
1744 del f
1745 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001746 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001747 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001748 self.assertEqual(f.read(), b"123xxx")
1749
R David Murray67bfe802013-02-23 21:51:05 -05001750 def test_args_error(self):
1751 # Issue #17275
1752 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1753 self.tp(io.BytesIO(), 1024, 1024, 1024)
1754
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001755
1756class PyBufferedWriterTest(BufferedWriterTest):
1757 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001758
Guido van Rossum01a27522007-03-07 01:00:12 +00001759class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001760
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001761 def test_constructor(self):
1762 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001763 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001764
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001765 def test_uninitialized(self):
1766 pair = self.tp.__new__(self.tp)
1767 del pair
1768 pair = self.tp.__new__(self.tp)
1769 self.assertRaisesRegex((ValueError, AttributeError),
1770 'uninitialized|has no attribute',
1771 pair.read, 0)
1772 self.assertRaisesRegex((ValueError, AttributeError),
1773 'uninitialized|has no attribute',
1774 pair.write, b'')
1775 pair.__init__(self.MockRawIO(), self.MockRawIO())
1776 self.assertEqual(pair.read(0), b'')
1777 self.assertEqual(pair.write(b''), 0)
1778
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001779 def test_detach(self):
1780 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1781 self.assertRaises(self.UnsupportedOperation, pair.detach)
1782
Florent Xicluna109d5732012-07-07 17:03:22 +02001783 def test_constructor_max_buffer_size_removal(self):
1784 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001785 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001786
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001787 def test_constructor_with_not_readable(self):
1788 class NotReadable(MockRawIO):
1789 def readable(self):
1790 return False
1791
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001792 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001793
1794 def test_constructor_with_not_writeable(self):
1795 class NotWriteable(MockRawIO):
1796 def writable(self):
1797 return False
1798
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001799 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001800
1801 def test_read(self):
1802 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1803
1804 self.assertEqual(pair.read(3), b"abc")
1805 self.assertEqual(pair.read(1), b"d")
1806 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001807 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1808 self.assertEqual(pair.read(None), b"abc")
1809
1810 def test_readlines(self):
1811 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1812 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1813 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1814 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001815
1816 def test_read1(self):
1817 # .read1() is delegated to the underlying reader object, so this test
1818 # can be shallow.
1819 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1820
1821 self.assertEqual(pair.read1(3), b"abc")
Martin Panterccb2c0e2016-10-20 23:48:14 +00001822 self.assertEqual(pair.read1(), b"def")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001823
1824 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00001825 for method in ("readinto", "readinto1"):
1826 with self.subTest(method):
1827 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001828
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03001829 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00001830 self.assertEqual(getattr(pair, method)(data), 5)
1831 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001832
1833 def test_write(self):
1834 w = self.MockRawIO()
1835 pair = self.tp(self.MockRawIO(), w)
1836
1837 pair.write(b"abc")
1838 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00001839 buffer = bytearray(b"def")
1840 pair.write(buffer)
1841 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001842 pair.flush()
1843 self.assertEqual(w._write_stack, [b"abc", b"def"])
1844
1845 def test_peek(self):
1846 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1847
1848 self.assertTrue(pair.peek(3).startswith(b"abc"))
1849 self.assertEqual(pair.read(3), b"abc")
1850
1851 def test_readable(self):
1852 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1853 self.assertTrue(pair.readable())
1854
1855 def test_writeable(self):
1856 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1857 self.assertTrue(pair.writable())
1858
1859 def test_seekable(self):
1860 # BufferedRWPairs are never seekable, even if their readers and writers
1861 # are.
1862 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1863 self.assertFalse(pair.seekable())
1864
1865 # .flush() is delegated to the underlying writer object and has been
1866 # tested in the test_write method.
1867
1868 def test_close_and_closed(self):
1869 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1870 self.assertFalse(pair.closed)
1871 pair.close()
1872 self.assertTrue(pair.closed)
1873
Serhiy Storchaka7665be62015-03-24 23:21:57 +02001874 def test_reader_close_error_on_close(self):
1875 def reader_close():
1876 reader_non_existing
1877 reader = self.MockRawIO()
1878 reader.close = reader_close
1879 writer = self.MockRawIO()
1880 pair = self.tp(reader, writer)
1881 with self.assertRaises(NameError) as err:
1882 pair.close()
1883 self.assertIn('reader_non_existing', str(err.exception))
1884 self.assertTrue(pair.closed)
1885 self.assertFalse(reader.closed)
1886 self.assertTrue(writer.closed)
1887
1888 def test_writer_close_error_on_close(self):
1889 def writer_close():
1890 writer_non_existing
1891 reader = self.MockRawIO()
1892 writer = self.MockRawIO()
1893 writer.close = writer_close
1894 pair = self.tp(reader, writer)
1895 with self.assertRaises(NameError) as err:
1896 pair.close()
1897 self.assertIn('writer_non_existing', str(err.exception))
1898 self.assertFalse(pair.closed)
1899 self.assertTrue(reader.closed)
1900 self.assertFalse(writer.closed)
1901
1902 def test_reader_writer_close_error_on_close(self):
1903 def reader_close():
1904 reader_non_existing
1905 def writer_close():
1906 writer_non_existing
1907 reader = self.MockRawIO()
1908 reader.close = reader_close
1909 writer = self.MockRawIO()
1910 writer.close = writer_close
1911 pair = self.tp(reader, writer)
1912 with self.assertRaises(NameError) as err:
1913 pair.close()
1914 self.assertIn('reader_non_existing', str(err.exception))
1915 self.assertIsInstance(err.exception.__context__, NameError)
1916 self.assertIn('writer_non_existing', str(err.exception.__context__))
1917 self.assertFalse(pair.closed)
1918 self.assertFalse(reader.closed)
1919 self.assertFalse(writer.closed)
1920
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001921 def test_isatty(self):
1922 class SelectableIsAtty(MockRawIO):
1923 def __init__(self, isatty):
1924 MockRawIO.__init__(self)
1925 self._isatty = isatty
1926
1927 def isatty(self):
1928 return self._isatty
1929
1930 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1931 self.assertFalse(pair.isatty())
1932
1933 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1934 self.assertTrue(pair.isatty())
1935
1936 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1937 self.assertTrue(pair.isatty())
1938
1939 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1940 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001941
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04001942 def test_weakref_clearing(self):
1943 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1944 ref = weakref.ref(brw)
1945 brw = None
1946 ref = None # Shouldn't segfault.
1947
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001948class CBufferedRWPairTest(BufferedRWPairTest):
1949 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001950
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001951class PyBufferedRWPairTest(BufferedRWPairTest):
1952 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001953
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001954
1955class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1956 read_mode = "rb+"
1957 write_mode = "wb+"
1958
1959 def test_constructor(self):
1960 BufferedReaderTest.test_constructor(self)
1961 BufferedWriterTest.test_constructor(self)
1962
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001963 def test_uninitialized(self):
1964 BufferedReaderTest.test_uninitialized(self)
1965 BufferedWriterTest.test_uninitialized(self)
1966
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001967 def test_read_and_write(self):
1968 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001969 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001970
1971 self.assertEqual(b"as", rw.read(2))
1972 rw.write(b"ddd")
1973 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001974 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001975 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001976 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001977
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001978 def test_seek_and_tell(self):
1979 raw = self.BytesIO(b"asdfghjkl")
1980 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001981
Ezio Melottib3aedd42010-11-20 19:04:17 +00001982 self.assertEqual(b"as", rw.read(2))
1983 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001984 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001985 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001986
Antoine Pitroue05565e2011-08-20 14:39:23 +02001987 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001988 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001989 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001990 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001991 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001992 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001993 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001994 self.assertEqual(7, rw.tell())
1995 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001996 rw.flush()
1997 self.assertEqual(b"asdf123fl", raw.getvalue())
1998
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001999 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002000
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002001 def check_flush_and_read(self, read_func):
2002 raw = self.BytesIO(b"abcdefghi")
2003 bufio = self.tp(raw)
2004
Ezio Melottib3aedd42010-11-20 19:04:17 +00002005 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002006 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002007 self.assertEqual(b"ef", read_func(bufio, 2))
2008 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002009 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002010 self.assertEqual(6, bufio.tell())
2011 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002012 raw.seek(0, 0)
2013 raw.write(b"XYZ")
2014 # flush() resets the read buffer
2015 bufio.flush()
2016 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002017 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002018
2019 def test_flush_and_read(self):
2020 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2021
2022 def test_flush_and_readinto(self):
2023 def _readinto(bufio, n=-1):
2024 b = bytearray(n if n >= 0 else 9999)
2025 n = bufio.readinto(b)
2026 return bytes(b[:n])
2027 self.check_flush_and_read(_readinto)
2028
2029 def test_flush_and_peek(self):
2030 def _peek(bufio, n=-1):
2031 # This relies on the fact that the buffer can contain the whole
2032 # raw stream, otherwise peek() can return less.
2033 b = bufio.peek(n)
2034 if n != -1:
2035 b = b[:n]
2036 bufio.seek(len(b), 1)
2037 return b
2038 self.check_flush_and_read(_peek)
2039
2040 def test_flush_and_write(self):
2041 raw = self.BytesIO(b"abcdefghi")
2042 bufio = self.tp(raw)
2043
2044 bufio.write(b"123")
2045 bufio.flush()
2046 bufio.write(b"45")
2047 bufio.flush()
2048 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002049 self.assertEqual(b"12345fghi", raw.getvalue())
2050 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002051
2052 def test_threads(self):
2053 BufferedReaderTest.test_threads(self)
2054 BufferedWriterTest.test_threads(self)
2055
2056 def test_writes_and_peek(self):
2057 def _peek(bufio):
2058 bufio.peek(1)
2059 self.check_writes(_peek)
2060 def _peek(bufio):
2061 pos = bufio.tell()
2062 bufio.seek(-1, 1)
2063 bufio.peek(1)
2064 bufio.seek(pos, 0)
2065 self.check_writes(_peek)
2066
2067 def test_writes_and_reads(self):
2068 def _read(bufio):
2069 bufio.seek(-1, 1)
2070 bufio.read(1)
2071 self.check_writes(_read)
2072
2073 def test_writes_and_read1s(self):
2074 def _read1(bufio):
2075 bufio.seek(-1, 1)
2076 bufio.read1(1)
2077 self.check_writes(_read1)
2078
2079 def test_writes_and_readintos(self):
2080 def _read(bufio):
2081 bufio.seek(-1, 1)
2082 bufio.readinto(bytearray(1))
2083 self.check_writes(_read)
2084
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002085 def test_write_after_readahead(self):
2086 # Issue #6629: writing after the buffer was filled by readahead should
2087 # first rewind the raw stream.
2088 for overwrite_size in [1, 5]:
2089 raw = self.BytesIO(b"A" * 10)
2090 bufio = self.tp(raw, 4)
2091 # Trigger readahead
2092 self.assertEqual(bufio.read(1), b"A")
2093 self.assertEqual(bufio.tell(), 1)
2094 # Overwriting should rewind the raw stream if it needs so
2095 bufio.write(b"B" * overwrite_size)
2096 self.assertEqual(bufio.tell(), overwrite_size + 1)
2097 # If the write size was smaller than the buffer size, flush() and
2098 # check that rewind happens.
2099 bufio.flush()
2100 self.assertEqual(bufio.tell(), overwrite_size + 1)
2101 s = raw.getvalue()
2102 self.assertEqual(s,
2103 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2104
Antoine Pitrou7c404892011-05-13 00:13:33 +02002105 def test_write_rewind_write(self):
2106 # Various combinations of reading / writing / seeking backwards / writing again
2107 def mutate(bufio, pos1, pos2):
2108 assert pos2 >= pos1
2109 # Fill the buffer
2110 bufio.seek(pos1)
2111 bufio.read(pos2 - pos1)
2112 bufio.write(b'\x02')
2113 # This writes earlier than the previous write, but still inside
2114 # the buffer.
2115 bufio.seek(pos1)
2116 bufio.write(b'\x01')
2117
2118 b = b"\x80\x81\x82\x83\x84"
2119 for i in range(0, len(b)):
2120 for j in range(i, len(b)):
2121 raw = self.BytesIO(b)
2122 bufio = self.tp(raw, 100)
2123 mutate(bufio, i, j)
2124 bufio.flush()
2125 expected = bytearray(b)
2126 expected[j] = 2
2127 expected[i] = 1
2128 self.assertEqual(raw.getvalue(), expected,
2129 "failed result for i=%d, j=%d" % (i, j))
2130
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002131 def test_truncate_after_read_or_write(self):
2132 raw = self.BytesIO(b"A" * 10)
2133 bufio = self.tp(raw, 100)
2134 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2135 self.assertEqual(bufio.truncate(), 2)
2136 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2137 self.assertEqual(bufio.truncate(), 4)
2138
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002139 def test_misbehaved_io(self):
2140 BufferedReaderTest.test_misbehaved_io(self)
2141 BufferedWriterTest.test_misbehaved_io(self)
2142
Antoine Pitroue05565e2011-08-20 14:39:23 +02002143 def test_interleaved_read_write(self):
2144 # Test for issue #12213
2145 with self.BytesIO(b'abcdefgh') as raw:
2146 with self.tp(raw, 100) as f:
2147 f.write(b"1")
2148 self.assertEqual(f.read(1), b'b')
2149 f.write(b'2')
2150 self.assertEqual(f.read1(1), b'd')
2151 f.write(b'3')
2152 buf = bytearray(1)
2153 f.readinto(buf)
2154 self.assertEqual(buf, b'f')
2155 f.write(b'4')
2156 self.assertEqual(f.peek(1), b'h')
2157 f.flush()
2158 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2159
2160 with self.BytesIO(b'abc') as raw:
2161 with self.tp(raw, 100) as f:
2162 self.assertEqual(f.read(1), b'a')
2163 f.write(b"2")
2164 self.assertEqual(f.read(1), b'c')
2165 f.flush()
2166 self.assertEqual(raw.getvalue(), b'a2c')
2167
2168 def test_interleaved_readline_write(self):
2169 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2170 with self.tp(raw) as f:
2171 f.write(b'1')
2172 self.assertEqual(f.readline(), b'b\n')
2173 f.write(b'2')
2174 self.assertEqual(f.readline(), b'def\n')
2175 f.write(b'3')
2176 self.assertEqual(f.readline(), b'\n')
2177 f.flush()
2178 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2179
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002180 # You can't construct a BufferedRandom over a non-seekable stream.
2181 test_unseekable = None
2182
R David Murray67bfe802013-02-23 21:51:05 -05002183
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002184class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002185 tp = io.BufferedRandom
2186
2187 def test_constructor(self):
2188 BufferedRandomTest.test_constructor(self)
2189 # The allocation can succeed on 32-bit builds, e.g. with more
2190 # than 2GB RAM and a 64-bit kernel.
2191 if sys.maxsize > 0x7FFFFFFF:
2192 rawio = self.MockRawIO()
2193 bufio = self.tp(rawio)
2194 self.assertRaises((OverflowError, MemoryError, ValueError),
2195 bufio.__init__, rawio, sys.maxsize)
2196
2197 def test_garbage_collection(self):
2198 CBufferedReaderTest.test_garbage_collection(self)
2199 CBufferedWriterTest.test_garbage_collection(self)
2200
R David Murray67bfe802013-02-23 21:51:05 -05002201 def test_args_error(self):
2202 # Issue #17275
2203 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2204 self.tp(io.BytesIO(), 1024, 1024, 1024)
2205
2206
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002207class PyBufferedRandomTest(BufferedRandomTest):
2208 tp = pyio.BufferedRandom
2209
2210
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002211# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2212# properties:
2213# - A single output character can correspond to many bytes of input.
2214# - The number of input bytes to complete the character can be
2215# undetermined until the last input byte is received.
2216# - The number of input bytes can vary depending on previous input.
2217# - A single input byte can correspond to many characters of output.
2218# - The number of output characters can be undetermined until the
2219# last input byte is received.
2220# - The number of output characters can vary depending on previous input.
2221
2222class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2223 """
2224 For testing seek/tell behavior with a stateful, buffering decoder.
2225
2226 Input is a sequence of words. Words may be fixed-length (length set
2227 by input) or variable-length (period-terminated). In variable-length
2228 mode, extra periods are ignored. Possible words are:
2229 - 'i' followed by a number sets the input length, I (maximum 99).
2230 When I is set to 0, words are space-terminated.
2231 - 'o' followed by a number sets the output length, O (maximum 99).
2232 - Any other word is converted into a word followed by a period on
2233 the output. The output word consists of the input word truncated
2234 or padded out with hyphens to make its length equal to O. If O
2235 is 0, the word is output verbatim without truncating or padding.
2236 I and O are initially set to 1. When I changes, any buffered input is
2237 re-scanned according to the new I. EOF also terminates the last word.
2238 """
2239
2240 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002241 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002242 self.reset()
2243
2244 def __repr__(self):
2245 return '<SID %x>' % id(self)
2246
2247 def reset(self):
2248 self.i = 1
2249 self.o = 1
2250 self.buffer = bytearray()
2251
2252 def getstate(self):
2253 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2254 return bytes(self.buffer), i*100 + o
2255
2256 def setstate(self, state):
2257 buffer, io = state
2258 self.buffer = bytearray(buffer)
2259 i, o = divmod(io, 100)
2260 self.i, self.o = i ^ 1, o ^ 1
2261
2262 def decode(self, input, final=False):
2263 output = ''
2264 for b in input:
2265 if self.i == 0: # variable-length, terminated with period
2266 if b == ord('.'):
2267 if self.buffer:
2268 output += self.process_word()
2269 else:
2270 self.buffer.append(b)
2271 else: # fixed-length, terminate after self.i bytes
2272 self.buffer.append(b)
2273 if len(self.buffer) == self.i:
2274 output += self.process_word()
2275 if final and self.buffer: # EOF terminates the last word
2276 output += self.process_word()
2277 return output
2278
2279 def process_word(self):
2280 output = ''
2281 if self.buffer[0] == ord('i'):
2282 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2283 elif self.buffer[0] == ord('o'):
2284 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2285 else:
2286 output = self.buffer.decode('ascii')
2287 if len(output) < self.o:
2288 output += '-'*self.o # pad out with hyphens
2289 if self.o:
2290 output = output[:self.o] # truncate to output length
2291 output += '.'
2292 self.buffer = bytearray()
2293 return output
2294
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002295 codecEnabled = False
2296
2297 @classmethod
2298 def lookupTestDecoder(cls, name):
2299 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002300 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002301 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002302 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002303 incrementalencoder=None,
2304 streamreader=None, streamwriter=None,
2305 incrementaldecoder=cls)
2306
2307# Register the previous decoder for testing.
2308# Disabled by default, tests will enable it.
2309codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2310
2311
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002312class StatefulIncrementalDecoderTest(unittest.TestCase):
2313 """
2314 Make sure the StatefulIncrementalDecoder actually works.
2315 """
2316
2317 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002318 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002319 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002320 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002321 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002322 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002323 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002324 # I=0, O=6 (variable-length input, fixed-length output)
2325 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2326 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002327 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002328 # I=6, O=3 (fixed-length input > fixed-length output)
2329 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2330 # I=0, then 3; O=29, then 15 (with longer output)
2331 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2332 'a----------------------------.' +
2333 'b----------------------------.' +
2334 'cde--------------------------.' +
2335 'abcdefghijabcde.' +
2336 'a.b------------.' +
2337 '.c.------------.' +
2338 'd.e------------.' +
2339 'k--------------.' +
2340 'l--------------.' +
2341 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002342 ]
2343
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002344 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002345 # Try a few one-shot test cases.
2346 for input, eof, output in self.test_cases:
2347 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002348 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002349
2350 # Also test an unfinished decode, followed by forcing EOF.
2351 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002352 self.assertEqual(d.decode(b'oiabcd'), '')
2353 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002354
2355class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002356
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002357 def setUp(self):
2358 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2359 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002360 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002361
Guido van Rossumd0712812007-04-11 16:32:43 +00002362 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002363 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002364
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002365 def test_constructor(self):
2366 r = self.BytesIO(b"\xc3\xa9\n\n")
2367 b = self.BufferedReader(r, 1000)
2368 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002369 t.__init__(b, encoding="latin-1", newline="\r\n")
2370 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002371 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002372 t.__init__(b, encoding="utf-8", line_buffering=True)
2373 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002374 self.assertEqual(t.line_buffering, True)
2375 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002376 self.assertRaises(TypeError, t.__init__, b, newline=42)
2377 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2378
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002379 def test_uninitialized(self):
2380 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2381 del t
2382 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2383 self.assertRaises(Exception, repr, t)
2384 self.assertRaisesRegex((ValueError, AttributeError),
2385 'uninitialized|has no attribute',
2386 t.read, 0)
2387 t.__init__(self.MockRawIO())
2388 self.assertEqual(t.read(0), '')
2389
Nick Coghlana9b15242014-02-04 22:11:18 +10002390 def test_non_text_encoding_codecs_are_rejected(self):
2391 # Ensure the constructor complains if passed a codec that isn't
2392 # marked as a text encoding
2393 # http://bugs.python.org/issue20404
2394 r = self.BytesIO()
2395 b = self.BufferedWriter(r)
2396 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2397 self.TextIOWrapper(b, encoding="hex")
2398
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002399 def test_detach(self):
2400 r = self.BytesIO()
2401 b = self.BufferedWriter(r)
2402 t = self.TextIOWrapper(b)
2403 self.assertIs(t.detach(), b)
2404
2405 t = self.TextIOWrapper(b, encoding="ascii")
2406 t.write("howdy")
2407 self.assertFalse(r.getvalue())
2408 t.detach()
2409 self.assertEqual(r.getvalue(), b"howdy")
2410 self.assertRaises(ValueError, t.detach)
2411
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002412 # Operations independent of the detached stream should still work
2413 repr(t)
2414 self.assertEqual(t.encoding, "ascii")
2415 self.assertEqual(t.errors, "strict")
2416 self.assertFalse(t.line_buffering)
2417
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002418 def test_repr(self):
2419 raw = self.BytesIO("hello".encode("utf-8"))
2420 b = self.BufferedReader(raw)
2421 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002422 modname = self.TextIOWrapper.__module__
2423 self.assertEqual(repr(t),
2424 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2425 raw.name = "dummy"
2426 self.assertEqual(repr(t),
2427 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002428 t.mode = "r"
2429 self.assertEqual(repr(t),
2430 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002431 raw.name = b"dummy"
2432 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002433 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002434
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002435 t.buffer.detach()
2436 repr(t) # Should not raise an exception
2437
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002438 def test_line_buffering(self):
2439 r = self.BytesIO()
2440 b = self.BufferedWriter(r, 1000)
2441 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002442 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002443 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002444 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002445 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002446 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002447 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002448
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002449 def test_default_encoding(self):
2450 old_environ = dict(os.environ)
2451 try:
2452 # try to get a user preferred encoding different than the current
2453 # locale encoding to check that TextIOWrapper() uses the current
2454 # locale encoding and not the user preferred encoding
2455 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2456 if key in os.environ:
2457 del os.environ[key]
2458
2459 current_locale_encoding = locale.getpreferredencoding(False)
2460 b = self.BytesIO()
2461 t = self.TextIOWrapper(b)
2462 self.assertEqual(t.encoding, current_locale_encoding)
2463 finally:
2464 os.environ.clear()
2465 os.environ.update(old_environ)
2466
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002467 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002468 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002469 # Issue 15989
2470 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002471 b = self.BytesIO()
2472 b.fileno = lambda: _testcapi.INT_MAX + 1
2473 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2474 b.fileno = lambda: _testcapi.UINT_MAX + 1
2475 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2476
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002477 def test_encoding(self):
2478 # Check the encoding attribute is always set, and valid
2479 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002480 t = self.TextIOWrapper(b, encoding="utf-8")
2481 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002482 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002483 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002484 codecs.lookup(t.encoding)
2485
2486 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002487 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002488 b = self.BytesIO(b"abc\n\xff\n")
2489 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002490 self.assertRaises(UnicodeError, t.read)
2491 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002492 b = self.BytesIO(b"abc\n\xff\n")
2493 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002494 self.assertRaises(UnicodeError, t.read)
2495 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002496 b = self.BytesIO(b"abc\n\xff\n")
2497 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002498 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002499 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002500 b = self.BytesIO(b"abc\n\xff\n")
2501 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002502 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002503
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002504 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002505 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002506 b = self.BytesIO()
2507 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002508 self.assertRaises(UnicodeError, t.write, "\xff")
2509 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002510 b = self.BytesIO()
2511 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002512 self.assertRaises(UnicodeError, t.write, "\xff")
2513 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002514 b = self.BytesIO()
2515 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002516 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002517 t.write("abc\xffdef\n")
2518 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002519 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002520 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002521 b = self.BytesIO()
2522 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002523 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002524 t.write("abc\xffdef\n")
2525 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002526 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002527
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002528 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002529 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2530
2531 tests = [
2532 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002533 [ '', input_lines ],
2534 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2535 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2536 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002537 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002538 encodings = (
2539 'utf-8', 'latin-1',
2540 'utf-16', 'utf-16-le', 'utf-16-be',
2541 'utf-32', 'utf-32-le', 'utf-32-be',
2542 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002543
Guido van Rossum8358db22007-08-18 21:39:55 +00002544 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002545 # character in TextIOWrapper._pending_line.
2546 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002547 # XXX: str.encode() should return bytes
2548 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002549 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002550 for bufsize in range(1, 10):
2551 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002552 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2553 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002554 encoding=encoding)
2555 if do_reads:
2556 got_lines = []
2557 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002558 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002559 if c2 == '':
2560 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002561 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002562 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002563 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002564 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002565
2566 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002567 self.assertEqual(got_line, exp_line)
2568 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002569
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002570 def test_newlines_input(self):
2571 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002572 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2573 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002574 (None, normalized.decode("ascii").splitlines(keepends=True)),
2575 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002576 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2577 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2578 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002579 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002580 buf = self.BytesIO(testdata)
2581 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002582 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002583 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002584 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002585
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002586 def test_newlines_output(self):
2587 testdict = {
2588 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2589 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2590 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2591 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2592 }
2593 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2594 for newline, expected in tests:
2595 buf = self.BytesIO()
2596 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2597 txt.write("AAA\nB")
2598 txt.write("BB\nCCC\n")
2599 txt.write("X\rY\r\nZ")
2600 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002601 self.assertEqual(buf.closed, False)
2602 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002603
2604 def test_destructor(self):
2605 l = []
2606 base = self.BytesIO
2607 class MyBytesIO(base):
2608 def close(self):
2609 l.append(self.getvalue())
2610 base.close(self)
2611 b = MyBytesIO()
2612 t = self.TextIOWrapper(b, encoding="ascii")
2613 t.write("abc")
2614 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002615 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002616 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002617
2618 def test_override_destructor(self):
2619 record = []
2620 class MyTextIO(self.TextIOWrapper):
2621 def __del__(self):
2622 record.append(1)
2623 try:
2624 f = super().__del__
2625 except AttributeError:
2626 pass
2627 else:
2628 f()
2629 def close(self):
2630 record.append(2)
2631 super().close()
2632 def flush(self):
2633 record.append(3)
2634 super().flush()
2635 b = self.BytesIO()
2636 t = MyTextIO(b, encoding="ascii")
2637 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002638 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002639 self.assertEqual(record, [1, 2, 3])
2640
2641 def test_error_through_destructor(self):
2642 # Test that the exception state is not modified by a destructor,
2643 # even if close() fails.
2644 rawio = self.CloseFailureIO()
2645 def f():
2646 self.TextIOWrapper(rawio).xyzzy
2647 with support.captured_output("stderr") as s:
2648 self.assertRaises(AttributeError, f)
2649 s = s.getvalue().strip()
2650 if s:
2651 # The destructor *may* have printed an unraisable error, check it
2652 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002653 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002654 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002655
Guido van Rossum9b76da62007-04-11 01:09:03 +00002656 # Systematic tests of the text I/O API
2657
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002658 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002659 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002660 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002661 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002662 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002663 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002664 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002665 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002666 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002667 self.assertEqual(f.tell(), 0)
2668 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002669 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002670 self.assertEqual(f.seek(0), 0)
2671 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002672 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002673 self.assertEqual(f.read(2), "ab")
2674 self.assertEqual(f.read(1), "c")
2675 self.assertEqual(f.read(1), "")
2676 self.assertEqual(f.read(), "")
2677 self.assertEqual(f.tell(), cookie)
2678 self.assertEqual(f.seek(0), 0)
2679 self.assertEqual(f.seek(0, 2), cookie)
2680 self.assertEqual(f.write("def"), 3)
2681 self.assertEqual(f.seek(cookie), cookie)
2682 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002683 if enc.startswith("utf"):
2684 self.multi_line_test(f, enc)
2685 f.close()
2686
2687 def multi_line_test(self, f, enc):
2688 f.seek(0)
2689 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002690 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002691 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002692 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002693 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002694 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002695 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002696 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002697 wlines.append((f.tell(), line))
2698 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002699 f.seek(0)
2700 rlines = []
2701 while True:
2702 pos = f.tell()
2703 line = f.readline()
2704 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002705 break
2706 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002707 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002708
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002709 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002710 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002711 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002712 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002713 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002714 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002715 p2 = f.tell()
2716 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002717 self.assertEqual(f.tell(), p0)
2718 self.assertEqual(f.readline(), "\xff\n")
2719 self.assertEqual(f.tell(), p1)
2720 self.assertEqual(f.readline(), "\xff\n")
2721 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002722 f.seek(0)
2723 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002724 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002725 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002726 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002727 f.close()
2728
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002729 def test_seeking(self):
2730 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002731 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002732 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002733 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002734 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002735 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002736 suffix = bytes(u_suffix.encode("utf-8"))
2737 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002738 with self.open(support.TESTFN, "wb") as f:
2739 f.write(line*2)
2740 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2741 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002742 self.assertEqual(s, str(prefix, "ascii"))
2743 self.assertEqual(f.tell(), prefix_size)
2744 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002745
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002746 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002747 # Regression test for a specific bug
2748 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002749 with self.open(support.TESTFN, "wb") as f:
2750 f.write(data)
2751 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2752 f._CHUNK_SIZE # Just test that it exists
2753 f._CHUNK_SIZE = 2
2754 f.readline()
2755 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002756
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002757 def test_seek_and_tell(self):
2758 #Test seek/tell using the StatefulIncrementalDecoder.
2759 # Make test faster by doing smaller seeks
2760 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002761
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002762 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002763 """Tell/seek to various points within a data stream and ensure
2764 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002765 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002766 f.write(data)
2767 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002768 f = self.open(support.TESTFN, encoding='test_decoder')
2769 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002770 decoded = f.read()
2771 f.close()
2772
Neal Norwitze2b07052008-03-18 19:52:05 +00002773 for i in range(min_pos, len(decoded) + 1): # seek positions
2774 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002775 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002776 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002777 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002778 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002779 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002780 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002781 f.close()
2782
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002783 # Enable the test decoder.
2784 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002785
2786 # Run the tests.
2787 try:
2788 # Try each test case.
2789 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002790 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002791
2792 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002793 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2794 offset = CHUNK_SIZE - len(input)//2
2795 prefix = b'.'*offset
2796 # Don't bother seeking into the prefix (takes too long).
2797 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002798 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002799
2800 # Ensure our test decoder won't interfere with subsequent tests.
2801 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002802 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002803
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002804 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002805 data = "1234567890"
2806 tests = ("utf-16",
2807 "utf-16-le",
2808 "utf-16-be",
2809 "utf-32",
2810 "utf-32-le",
2811 "utf-32-be")
2812 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002813 buf = self.BytesIO()
2814 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002815 # Check if the BOM is written only once (see issue1753).
2816 f.write(data)
2817 f.write(data)
2818 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002819 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002820 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002821 self.assertEqual(f.read(), data * 2)
2822 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002823
Benjamin Petersona1b49012009-03-31 23:11:32 +00002824 def test_unreadable(self):
2825 class UnReadable(self.BytesIO):
2826 def readable(self):
2827 return False
2828 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002829 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002830
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002831 def test_read_one_by_one(self):
2832 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002833 reads = ""
2834 while True:
2835 c = txt.read(1)
2836 if not c:
2837 break
2838 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002839 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002840
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002841 def test_readlines(self):
2842 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2843 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2844 txt.seek(0)
2845 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2846 txt.seek(0)
2847 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2848
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002849 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002850 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002851 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002852 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002853 reads = ""
2854 while True:
2855 c = txt.read(128)
2856 if not c:
2857 break
2858 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002859 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002860
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002861 def test_writelines(self):
2862 l = ['ab', 'cd', 'ef']
2863 buf = self.BytesIO()
2864 txt = self.TextIOWrapper(buf)
2865 txt.writelines(l)
2866 txt.flush()
2867 self.assertEqual(buf.getvalue(), b'abcdef')
2868
2869 def test_writelines_userlist(self):
2870 l = UserList(['ab', 'cd', 'ef'])
2871 buf = self.BytesIO()
2872 txt = self.TextIOWrapper(buf)
2873 txt.writelines(l)
2874 txt.flush()
2875 self.assertEqual(buf.getvalue(), b'abcdef')
2876
2877 def test_writelines_error(self):
2878 txt = self.TextIOWrapper(self.BytesIO())
2879 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2880 self.assertRaises(TypeError, txt.writelines, None)
2881 self.assertRaises(TypeError, txt.writelines, b'abc')
2882
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002883 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002884 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002885
2886 # read one char at a time
2887 reads = ""
2888 while True:
2889 c = txt.read(1)
2890 if not c:
2891 break
2892 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002893 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002894
2895 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002896 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002897 txt._CHUNK_SIZE = 4
2898
2899 reads = ""
2900 while True:
2901 c = txt.read(4)
2902 if not c:
2903 break
2904 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002905 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002906
2907 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002908 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002909 txt._CHUNK_SIZE = 4
2910
2911 reads = txt.read(4)
2912 reads += txt.read(4)
2913 reads += txt.readline()
2914 reads += txt.readline()
2915 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002916 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002917
2918 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002919 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002920 txt._CHUNK_SIZE = 4
2921
2922 reads = txt.read(4)
2923 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002924 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002925
2926 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002927 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002928 txt._CHUNK_SIZE = 4
2929
2930 reads = txt.read(4)
2931 pos = txt.tell()
2932 txt.seek(0)
2933 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002934 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002935
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002936 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002937 buffer = self.BytesIO(self.testdata)
2938 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002939
2940 self.assertEqual(buffer.seekable(), txt.seekable())
2941
Antoine Pitroue4501852009-05-14 18:55:55 +00002942 def test_append_bom(self):
2943 # The BOM is not written again when appending to a non-empty file
2944 filename = support.TESTFN
2945 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2946 with self.open(filename, 'w', encoding=charset) as f:
2947 f.write('aaa')
2948 pos = f.tell()
2949 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002950 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002951
2952 with self.open(filename, 'a', encoding=charset) as f:
2953 f.write('xxx')
2954 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002955 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002956
2957 def test_seek_bom(self):
2958 # Same test, but when seeking manually
2959 filename = support.TESTFN
2960 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2961 with self.open(filename, 'w', encoding=charset) as f:
2962 f.write('aaa')
2963 pos = f.tell()
2964 with self.open(filename, 'r+', encoding=charset) as f:
2965 f.seek(pos)
2966 f.write('zzz')
2967 f.seek(0)
2968 f.write('bbb')
2969 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002970 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002971
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02002972 def test_seek_append_bom(self):
2973 # Same test, but first seek to the start and then to the end
2974 filename = support.TESTFN
2975 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2976 with self.open(filename, 'w', encoding=charset) as f:
2977 f.write('aaa')
2978 with self.open(filename, 'a', encoding=charset) as f:
2979 f.seek(0)
2980 f.seek(0, self.SEEK_END)
2981 f.write('xxx')
2982 with self.open(filename, 'rb') as f:
2983 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
2984
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002985 def test_errors_property(self):
2986 with self.open(support.TESTFN, "w") as f:
2987 self.assertEqual(f.errors, "strict")
2988 with self.open(support.TESTFN, "w", errors="replace") as f:
2989 self.assertEqual(f.errors, "replace")
2990
Brett Cannon31f59292011-02-21 19:29:56 +00002991 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002992 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002993 def test_threads_write(self):
2994 # Issue6750: concurrent writes could duplicate data
2995 event = threading.Event()
2996 with self.open(support.TESTFN, "w", buffering=1) as f:
2997 def run(n):
2998 text = "Thread%03d\n" % n
2999 event.wait()
3000 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003001 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003002 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003003 with support.start_threads(threads, event.set):
3004 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003005 with self.open(support.TESTFN) as f:
3006 content = f.read()
3007 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003008 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003009
Antoine Pitrou6be88762010-05-03 16:48:20 +00003010 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003011 # Test that text file is closed despite failed flush
3012 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003013 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003014 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003015 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003016 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003017 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003018 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003019 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003020 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003021 self.assertTrue(txt.buffer.closed)
3022 self.assertTrue(closed) # flush() called
3023 self.assertFalse(closed[0]) # flush() called before file closed
3024 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003025 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003026
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003027 def test_close_error_on_close(self):
3028 buffer = self.BytesIO(self.testdata)
3029 def bad_flush():
3030 raise OSError('flush')
3031 def bad_close():
3032 raise OSError('close')
3033 buffer.close = bad_close
3034 txt = self.TextIOWrapper(buffer, encoding="ascii")
3035 txt.flush = bad_flush
3036 with self.assertRaises(OSError) as err: # exception not swallowed
3037 txt.close()
3038 self.assertEqual(err.exception.args, ('close',))
3039 self.assertIsInstance(err.exception.__context__, OSError)
3040 self.assertEqual(err.exception.__context__.args, ('flush',))
3041 self.assertFalse(txt.closed)
3042
3043 def test_nonnormalized_close_error_on_close(self):
3044 # Issue #21677
3045 buffer = self.BytesIO(self.testdata)
3046 def bad_flush():
3047 raise non_existing_flush
3048 def bad_close():
3049 raise non_existing_close
3050 buffer.close = bad_close
3051 txt = self.TextIOWrapper(buffer, encoding="ascii")
3052 txt.flush = bad_flush
3053 with self.assertRaises(NameError) as err: # exception not swallowed
3054 txt.close()
3055 self.assertIn('non_existing_close', str(err.exception))
3056 self.assertIsInstance(err.exception.__context__, NameError)
3057 self.assertIn('non_existing_flush', str(err.exception.__context__))
3058 self.assertFalse(txt.closed)
3059
Antoine Pitrou6be88762010-05-03 16:48:20 +00003060 def test_multi_close(self):
3061 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3062 txt.close()
3063 txt.close()
3064 txt.close()
3065 self.assertRaises(ValueError, txt.flush)
3066
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003067 def test_unseekable(self):
3068 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3069 self.assertRaises(self.UnsupportedOperation, txt.tell)
3070 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3071
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003072 def test_readonly_attributes(self):
3073 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3074 buf = self.BytesIO(self.testdata)
3075 with self.assertRaises(AttributeError):
3076 txt.buffer = buf
3077
Antoine Pitroue96ec682011-07-23 21:46:35 +02003078 def test_rawio(self):
3079 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3080 # that subprocess.Popen() can have the required unbuffered
3081 # semantics with universal_newlines=True.
3082 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3083 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3084 # Reads
3085 self.assertEqual(txt.read(4), 'abcd')
3086 self.assertEqual(txt.readline(), 'efghi\n')
3087 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3088
3089 def test_rawio_write_through(self):
3090 # Issue #12591: with write_through=True, writes don't need a flush
3091 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3092 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3093 write_through=True)
3094 txt.write('1')
3095 txt.write('23\n4')
3096 txt.write('5')
3097 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3098
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003099 def test_bufio_write_through(self):
3100 # Issue #21396: write_through=True doesn't force a flush()
3101 # on the underlying binary buffered object.
3102 flush_called, write_called = [], []
3103 class BufferedWriter(self.BufferedWriter):
3104 def flush(self, *args, **kwargs):
3105 flush_called.append(True)
3106 return super().flush(*args, **kwargs)
3107 def write(self, *args, **kwargs):
3108 write_called.append(True)
3109 return super().write(*args, **kwargs)
3110
3111 rawio = self.BytesIO()
3112 data = b"a"
3113 bufio = BufferedWriter(rawio, len(data)*2)
3114 textio = self.TextIOWrapper(bufio, encoding='ascii',
3115 write_through=True)
3116 # write to the buffered io but don't overflow the buffer
3117 text = data.decode('ascii')
3118 textio.write(text)
3119
3120 # buffer.flush is not called with write_through=True
3121 self.assertFalse(flush_called)
3122 # buffer.write *is* called with write_through=True
3123 self.assertTrue(write_called)
3124 self.assertEqual(rawio.getvalue(), b"") # no flush
3125
3126 write_called = [] # reset
3127 textio.write(text * 10) # total content is larger than bufio buffer
3128 self.assertTrue(write_called)
3129 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3130
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003131 def test_read_nonbytes(self):
3132 # Issue #17106
3133 # Crash when underlying read() returns non-bytes
3134 t = self.TextIOWrapper(self.StringIO('a'))
3135 self.assertRaises(TypeError, t.read, 1)
3136 t = self.TextIOWrapper(self.StringIO('a'))
3137 self.assertRaises(TypeError, t.readline)
3138 t = self.TextIOWrapper(self.StringIO('a'))
3139 self.assertRaises(TypeError, t.read)
3140
3141 def test_illegal_decoder(self):
3142 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003143 # Bypass the early encoding check added in issue 20404
3144 def _make_illegal_wrapper():
3145 quopri = codecs.lookup("quopri")
3146 quopri._is_text_encoding = True
3147 try:
3148 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3149 newline='\n', encoding="quopri")
3150 finally:
3151 quopri._is_text_encoding = False
3152 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003153 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003154 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003155 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003156 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003157 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003158 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003159 self.assertRaises(TypeError, t.read)
3160
Antoine Pitrou712cb732013-12-21 15:51:54 +01003161 def _check_create_at_shutdown(self, **kwargs):
3162 # Issue #20037: creating a TextIOWrapper at shutdown
3163 # shouldn't crash the interpreter.
3164 iomod = self.io.__name__
3165 code = """if 1:
3166 import codecs
3167 import {iomod} as io
3168
3169 # Avoid looking up codecs at shutdown
3170 codecs.lookup('utf-8')
3171
3172 class C:
3173 def __init__(self):
3174 self.buf = io.BytesIO()
3175 def __del__(self):
3176 io.TextIOWrapper(self.buf, **{kwargs})
3177 print("ok")
3178 c = C()
3179 """.format(iomod=iomod, kwargs=kwargs)
3180 return assert_python_ok("-c", code)
3181
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003182 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003183 def test_create_at_shutdown_without_encoding(self):
3184 rc, out, err = self._check_create_at_shutdown()
3185 if err:
3186 # Can error out with a RuntimeError if the module state
3187 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003188 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003189 else:
3190 self.assertEqual("ok", out.decode().strip())
3191
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003192 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003193 def test_create_at_shutdown_with_encoding(self):
3194 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3195 errors='strict')
3196 self.assertFalse(err)
3197 self.assertEqual("ok", out.decode().strip())
3198
Antoine Pitroub8503892014-04-29 10:14:02 +02003199 def test_read_byteslike(self):
3200 r = MemviewBytesIO(b'Just some random string\n')
3201 t = self.TextIOWrapper(r, 'utf-8')
3202
3203 # TextIOwrapper will not read the full string, because
3204 # we truncate it to a multiple of the native int size
3205 # so that we can construct a more complex memoryview.
3206 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3207
3208 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3209
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003210 def test_issue22849(self):
3211 class F(object):
3212 def readable(self): return True
3213 def writable(self): return True
3214 def seekable(self): return True
3215
3216 for i in range(10):
3217 try:
3218 self.TextIOWrapper(F(), encoding='utf-8')
3219 except Exception:
3220 pass
3221
3222 F.tell = lambda x: 0
3223 t = self.TextIOWrapper(F(), encoding='utf-8')
3224
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003225
Antoine Pitroub8503892014-04-29 10:14:02 +02003226class MemviewBytesIO(io.BytesIO):
3227 '''A BytesIO object whose read method returns memoryviews
3228 rather than bytes'''
3229
3230 def read1(self, len_):
3231 return _to_memoryview(super().read1(len_))
3232
3233 def read(self, len_):
3234 return _to_memoryview(super().read(len_))
3235
3236def _to_memoryview(buf):
3237 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3238
3239 arr = array.array('i')
3240 idx = len(buf) - len(buf) % arr.itemsize
3241 arr.frombytes(buf[:idx])
3242 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003243
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003244
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003245class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003246 io = io
Steve Dowerfe8f4c92016-09-07 09:31:52 -07003247 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003248
3249 def test_initialization(self):
3250 r = self.BytesIO(b"\xc3\xa9\n\n")
3251 b = self.BufferedReader(r, 1000)
3252 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003253 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3254 self.assertRaises(ValueError, t.read)
3255
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003256 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3257 self.assertRaises(Exception, repr, t)
3258
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003259 def test_garbage_collection(self):
3260 # C TextIOWrapper objects are collected, and collecting them flushes
3261 # all data to disk.
3262 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003263 with support.check_warnings(('', ResourceWarning)):
3264 rawio = io.FileIO(support.TESTFN, "wb")
3265 b = self.BufferedWriter(rawio)
3266 t = self.TextIOWrapper(b, encoding="ascii")
3267 t.write("456def")
3268 t.x = t
3269 wr = weakref.ref(t)
3270 del t
3271 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003272 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003273 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003274 self.assertEqual(f.read(), b"456def")
3275
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003276 def test_rwpair_cleared_before_textio(self):
3277 # Issue 13070: TextIOWrapper's finalization would crash when called
3278 # after the reference to the underlying BufferedRWPair's writer got
3279 # cleared by the GC.
3280 for i in range(1000):
3281 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3282 t1 = self.TextIOWrapper(b1, encoding="ascii")
3283 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3284 t2 = self.TextIOWrapper(b2, encoding="ascii")
3285 # circular references
3286 t1.buddy = t2
3287 t2.buddy = t1
3288 support.gc_collect()
3289
3290
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003291class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003292 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003293 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003294
3295
3296class IncrementalNewlineDecoderTest(unittest.TestCase):
3297
3298 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003299 # UTF-8 specific tests for a newline decoder
3300 def _check_decode(b, s, **kwargs):
3301 # We exercise getstate() / setstate() as well as decode()
3302 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003303 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003304 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003305 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003306
Antoine Pitrou180a3362008-12-14 16:36:46 +00003307 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003308
Antoine Pitrou180a3362008-12-14 16:36:46 +00003309 _check_decode(b'\xe8', "")
3310 _check_decode(b'\xa2', "")
3311 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003312
Antoine Pitrou180a3362008-12-14 16:36:46 +00003313 _check_decode(b'\xe8', "")
3314 _check_decode(b'\xa2', "")
3315 _check_decode(b'\x88', "\u8888")
3316
3317 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003318 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3319
Antoine Pitrou180a3362008-12-14 16:36:46 +00003320 decoder.reset()
3321 _check_decode(b'\n', "\n")
3322 _check_decode(b'\r', "")
3323 _check_decode(b'', "\n", final=True)
3324 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003325
Antoine Pitrou180a3362008-12-14 16:36:46 +00003326 _check_decode(b'\r', "")
3327 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003328
Antoine Pitrou180a3362008-12-14 16:36:46 +00003329 _check_decode(b'\r\r\n', "\n\n")
3330 _check_decode(b'\r', "")
3331 _check_decode(b'\r', "\n")
3332 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003333
Antoine Pitrou180a3362008-12-14 16:36:46 +00003334 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3335 _check_decode(b'\xe8\xa2\x88', "\u8888")
3336 _check_decode(b'\n', "\n")
3337 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3338 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003339
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003340 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003341 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003342 if encoding is not None:
3343 encoder = codecs.getincrementalencoder(encoding)()
3344 def _decode_bytewise(s):
3345 # Decode one byte at a time
3346 for b in encoder.encode(s):
3347 result.append(decoder.decode(bytes([b])))
3348 else:
3349 encoder = None
3350 def _decode_bytewise(s):
3351 # Decode one char at a time
3352 for c in s:
3353 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003354 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003355 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003356 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003357 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003358 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003359 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003360 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003361 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003362 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003363 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003364 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003365 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003366 input = "abc"
3367 if encoder is not None:
3368 encoder.reset()
3369 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003370 self.assertEqual(decoder.decode(input), "abc")
3371 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003372
3373 def test_newline_decoder(self):
3374 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003375 # None meaning the IncrementalNewlineDecoder takes unicode input
3376 # rather than bytes input
3377 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003378 'utf-16', 'utf-16-le', 'utf-16-be',
3379 'utf-32', 'utf-32-le', 'utf-32-be',
3380 )
3381 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003382 decoder = enc and codecs.getincrementaldecoder(enc)()
3383 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3384 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003385 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003386 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3387 self.check_newline_decoding_utf8(decoder)
3388
Antoine Pitrou66913e22009-03-06 23:40:56 +00003389 def test_newline_bytes(self):
3390 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3391 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003392 self.assertEqual(dec.newlines, None)
3393 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3394 self.assertEqual(dec.newlines, None)
3395 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3396 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003397 dec = self.IncrementalNewlineDecoder(None, translate=False)
3398 _check(dec)
3399 dec = self.IncrementalNewlineDecoder(None, translate=True)
3400 _check(dec)
3401
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003402class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3403 pass
3404
3405class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3406 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003407
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003408
Guido van Rossum01a27522007-03-07 01:00:12 +00003409# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003410
Guido van Rossum5abbf752007-08-27 17:39:33 +00003411class MiscIOTest(unittest.TestCase):
3412
Barry Warsaw40e82462008-11-20 20:14:50 +00003413 def tearDown(self):
3414 support.unlink(support.TESTFN)
3415
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003416 def test___all__(self):
3417 for name in self.io.__all__:
3418 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003419 self.assertIsNotNone(obj, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003420 if name == "open":
3421 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003422 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003423 self.assertTrue(issubclass(obj, Exception), name)
3424 elif not name.startswith("SEEK_"):
3425 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003426
Barry Warsaw40e82462008-11-20 20:14:50 +00003427 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003428 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003429 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003430 f.close()
3431
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003432 with support.check_warnings(('', DeprecationWarning)):
3433 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003434 self.assertEqual(f.name, support.TESTFN)
3435 self.assertEqual(f.buffer.name, support.TESTFN)
3436 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3437 self.assertEqual(f.mode, "U")
3438 self.assertEqual(f.buffer.mode, "rb")
3439 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003440 f.close()
3441
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003442 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003443 self.assertEqual(f.mode, "w+")
3444 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3445 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003446
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003447 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003448 self.assertEqual(g.mode, "wb")
3449 self.assertEqual(g.raw.mode, "wb")
3450 self.assertEqual(g.name, f.fileno())
3451 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003452 f.close()
3453 g.close()
3454
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003455 def test_io_after_close(self):
3456 for kwargs in [
3457 {"mode": "w"},
3458 {"mode": "wb"},
3459 {"mode": "w", "buffering": 1},
3460 {"mode": "w", "buffering": 2},
3461 {"mode": "wb", "buffering": 0},
3462 {"mode": "r"},
3463 {"mode": "rb"},
3464 {"mode": "r", "buffering": 1},
3465 {"mode": "r", "buffering": 2},
3466 {"mode": "rb", "buffering": 0},
3467 {"mode": "w+"},
3468 {"mode": "w+b"},
3469 {"mode": "w+", "buffering": 1},
3470 {"mode": "w+", "buffering": 2},
3471 {"mode": "w+b", "buffering": 0},
3472 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003473 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003474 f.close()
3475 self.assertRaises(ValueError, f.flush)
3476 self.assertRaises(ValueError, f.fileno)
3477 self.assertRaises(ValueError, f.isatty)
3478 self.assertRaises(ValueError, f.__iter__)
3479 if hasattr(f, "peek"):
3480 self.assertRaises(ValueError, f.peek, 1)
3481 self.assertRaises(ValueError, f.read)
3482 if hasattr(f, "read1"):
3483 self.assertRaises(ValueError, f.read1, 1024)
Martin Panterccb2c0e2016-10-20 23:48:14 +00003484 self.assertRaises(ValueError, f.read1)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003485 if hasattr(f, "readall"):
3486 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003487 if hasattr(f, "readinto"):
3488 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003489 if hasattr(f, "readinto1"):
3490 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003491 self.assertRaises(ValueError, f.readline)
3492 self.assertRaises(ValueError, f.readlines)
3493 self.assertRaises(ValueError, f.seek, 0)
3494 self.assertRaises(ValueError, f.tell)
3495 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003496 self.assertRaises(ValueError, f.write,
3497 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003498 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003499 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003500
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003501 def test_blockingioerror(self):
3502 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003503 class C(str):
3504 pass
3505 c = C("")
3506 b = self.BlockingIOError(1, c)
3507 c.b = b
3508 b.c = c
3509 wr = weakref.ref(c)
3510 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003511 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003512 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003513
3514 def test_abcs(self):
3515 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003516 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3517 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3518 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3519 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003520
3521 def _check_abc_inheritance(self, abcmodule):
3522 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003523 self.assertIsInstance(f, abcmodule.IOBase)
3524 self.assertIsInstance(f, abcmodule.RawIOBase)
3525 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3526 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003527 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003528 self.assertIsInstance(f, abcmodule.IOBase)
3529 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3530 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3531 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003532 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003533 self.assertIsInstance(f, abcmodule.IOBase)
3534 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3535 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3536 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003537
3538 def test_abc_inheritance(self):
3539 # Test implementations inherit from their respective ABCs
3540 self._check_abc_inheritance(self)
3541
3542 def test_abc_inheritance_official(self):
3543 # Test implementations inherit from the official ABCs of the
3544 # baseline "io" module.
3545 self._check_abc_inheritance(io)
3546
Antoine Pitroue033e062010-10-29 10:38:18 +00003547 def _check_warn_on_dealloc(self, *args, **kwargs):
3548 f = open(*args, **kwargs)
3549 r = repr(f)
3550 with self.assertWarns(ResourceWarning) as cm:
3551 f = None
3552 support.gc_collect()
3553 self.assertIn(r, str(cm.warning.args[0]))
3554
3555 def test_warn_on_dealloc(self):
3556 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3557 self._check_warn_on_dealloc(support.TESTFN, "wb")
3558 self._check_warn_on_dealloc(support.TESTFN, "w")
3559
3560 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3561 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003562 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003563 for fd in fds:
3564 try:
3565 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003566 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003567 if e.errno != errno.EBADF:
3568 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003569 self.addCleanup(cleanup_fds)
3570 r, w = os.pipe()
3571 fds += r, w
3572 self._check_warn_on_dealloc(r, *args, **kwargs)
3573 # When using closefd=False, there's no warning
3574 r, w = os.pipe()
3575 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02003576 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00003577 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00003578
3579 def test_warn_on_dealloc_fd(self):
3580 self._check_warn_on_dealloc_fd("rb", buffering=0)
3581 self._check_warn_on_dealloc_fd("rb")
3582 self._check_warn_on_dealloc_fd("r")
3583
3584
Antoine Pitrou243757e2010-11-05 21:15:39 +00003585 def test_pickling(self):
3586 # Pickling file objects is forbidden
3587 for kwargs in [
3588 {"mode": "w"},
3589 {"mode": "wb"},
3590 {"mode": "wb", "buffering": 0},
3591 {"mode": "r"},
3592 {"mode": "rb"},
3593 {"mode": "rb", "buffering": 0},
3594 {"mode": "w+"},
3595 {"mode": "w+b"},
3596 {"mode": "w+b", "buffering": 0},
3597 ]:
3598 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3599 with self.open(support.TESTFN, **kwargs) as f:
3600 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3601
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003602 def test_nonblock_pipe_write_bigbuf(self):
3603 self._test_nonblock_pipe_write(16*1024)
3604
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003605 def test_nonblock_pipe_write_smallbuf(self):
3606 self._test_nonblock_pipe_write(1024)
3607
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003608 @unittest.skipUnless(hasattr(os, 'set_blocking'),
3609 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003610 def _test_nonblock_pipe_write(self, bufsize):
3611 sent = []
3612 received = []
3613 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003614 os.set_blocking(r, False)
3615 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003616
3617 # To exercise all code paths in the C implementation we need
3618 # to play with buffer sizes. For instance, if we choose a
3619 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3620 # then we will never get a partial write of the buffer.
3621 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3622 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3623
3624 with rf, wf:
3625 for N in 9999, 73, 7574:
3626 try:
3627 i = 0
3628 while True:
3629 msg = bytes([i % 26 + 97]) * N
3630 sent.append(msg)
3631 wf.write(msg)
3632 i += 1
3633
3634 except self.BlockingIOError as e:
3635 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003636 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003637 sent[-1] = sent[-1][:e.characters_written]
3638 received.append(rf.read())
3639 msg = b'BLOCKED'
3640 wf.write(msg)
3641 sent.append(msg)
3642
3643 while True:
3644 try:
3645 wf.flush()
3646 break
3647 except self.BlockingIOError as e:
3648 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003649 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003650 self.assertEqual(e.characters_written, 0)
3651 received.append(rf.read())
3652
3653 received += iter(rf.read, None)
3654
3655 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003656 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003657 self.assertTrue(wf.closed)
3658 self.assertTrue(rf.closed)
3659
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003660 def test_create_fail(self):
3661 # 'x' mode fails if file is existing
3662 with self.open(support.TESTFN, 'w'):
3663 pass
3664 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3665
3666 def test_create_writes(self):
3667 # 'x' mode opens for writing
3668 with self.open(support.TESTFN, 'xb') as f:
3669 f.write(b"spam")
3670 with self.open(support.TESTFN, 'rb') as f:
3671 self.assertEqual(b"spam", f.read())
3672
Christian Heimes7b648752012-09-10 14:48:43 +02003673 def test_open_allargs(self):
3674 # there used to be a buffer overflow in the parser for rawmode
3675 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3676
3677
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003678class CMiscIOTest(MiscIOTest):
3679 io = io
3680
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003681 def test_readinto_buffer_overflow(self):
3682 # Issue #18025
3683 class BadReader(self.io.BufferedIOBase):
3684 def read(self, n=-1):
3685 return b'x' * 10**6
3686 bufio = BadReader()
3687 b = bytearray(2)
3688 self.assertRaises(ValueError, bufio.readinto, b)
3689
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003690 @unittest.skipUnless(threading, 'Threading required for this test.')
3691 def check_daemon_threads_shutdown_deadlock(self, stream_name):
3692 # Issue #23309: deadlocks at shutdown should be avoided when a
3693 # daemon thread and the main thread both write to a file.
3694 code = """if 1:
3695 import sys
3696 import time
3697 import threading
3698
3699 file = sys.{stream_name}
3700
3701 def run():
3702 while True:
3703 file.write('.')
3704 file.flush()
3705
3706 thread = threading.Thread(target=run)
3707 thread.daemon = True
3708 thread.start()
3709
3710 time.sleep(0.5)
3711 file.write('!')
3712 file.flush()
3713 """.format_map(locals())
3714 res, _ = run_python_until_end("-c", code)
3715 err = res.err.decode()
3716 if res.rc != 0:
3717 # Failure: should be a fatal error
3718 self.assertIn("Fatal Python error: could not acquire lock "
3719 "for <_io.BufferedWriter name='<{stream_name}>'> "
3720 "at interpreter shutdown, possibly due to "
3721 "daemon threads".format_map(locals()),
3722 err)
3723 else:
3724 self.assertFalse(err.strip('.!'))
3725
3726 def test_daemon_threads_shutdown_stdout_deadlock(self):
3727 self.check_daemon_threads_shutdown_deadlock('stdout')
3728
3729 def test_daemon_threads_shutdown_stderr_deadlock(self):
3730 self.check_daemon_threads_shutdown_deadlock('stderr')
3731
3732
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003733class PyMiscIOTest(MiscIOTest):
3734 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003735
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003736
3737@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3738class SignalsTest(unittest.TestCase):
3739
3740 def setUp(self):
3741 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3742
3743 def tearDown(self):
3744 signal.signal(signal.SIGALRM, self.oldalrm)
3745
3746 def alarm_interrupt(self, sig, frame):
3747 1/0
3748
3749 @unittest.skipUnless(threading, 'Threading required for this test.')
3750 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3751 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003752 invokes the signal handler, and bubbles up the exception raised
3753 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003754 read_results = []
3755 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003756 if hasattr(signal, 'pthread_sigmask'):
3757 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003758 s = os.read(r, 1)
3759 read_results.append(s)
3760 t = threading.Thread(target=_read)
3761 t.daemon = True
3762 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003763 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01003764 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003765 try:
3766 wio = self.io.open(w, **fdopen_kwargs)
3767 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003768 # Fill the pipe enough that the write will be blocking.
3769 # It will be interrupted by the timer armed above. Since the
3770 # other thread has read one byte, the low-level write will
3771 # return with a successful (partial) result rather than an EINTR.
3772 # The buffered IO layer must check for pending signal
3773 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003774 signal.alarm(1)
3775 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01003776 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02003777 finally:
3778 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003779 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003780 # We got one byte, get another one and check that it isn't a
3781 # repeat of the first one.
3782 read_results.append(os.read(r, 1))
3783 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3784 finally:
3785 os.close(w)
3786 os.close(r)
3787 # This is deliberate. If we didn't close the file descriptor
3788 # before closing wio, wio would try to flush its internal
3789 # buffer, and block again.
3790 try:
3791 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003792 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003793 if e.errno != errno.EBADF:
3794 raise
3795
3796 def test_interrupted_write_unbuffered(self):
3797 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3798
3799 def test_interrupted_write_buffered(self):
3800 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3801
Victor Stinner6ab72862014-09-03 23:32:28 +02003802 # Issue #22331: The test hangs on FreeBSD 7.2
3803 @support.requires_freebsd_version(8)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003804 def test_interrupted_write_text(self):
3805 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3806
Brett Cannon31f59292011-02-21 19:29:56 +00003807 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003808 def check_reentrant_write(self, data, **fdopen_kwargs):
3809 def on_alarm(*args):
3810 # Will be called reentrantly from the same thread
3811 wio.write(data)
3812 1/0
3813 signal.signal(signal.SIGALRM, on_alarm)
3814 r, w = os.pipe()
3815 wio = self.io.open(w, **fdopen_kwargs)
3816 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003817 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003818 # Either the reentrant call to wio.write() fails with RuntimeError,
3819 # or the signal handler raises ZeroDivisionError.
3820 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3821 while 1:
3822 for i in range(100):
3823 wio.write(data)
3824 wio.flush()
3825 # Make sure the buffer doesn't fill up and block further writes
3826 os.read(r, len(data) * 100)
3827 exc = cm.exception
3828 if isinstance(exc, RuntimeError):
3829 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3830 finally:
3831 wio.close()
3832 os.close(r)
3833
3834 def test_reentrant_write_buffered(self):
3835 self.check_reentrant_write(b"xy", mode="wb")
3836
3837 def test_reentrant_write_text(self):
3838 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3839
Antoine Pitrou707ce822011-02-25 21:24:11 +00003840 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3841 """Check that a buffered read, when it gets interrupted (either
3842 returning a partial result or EINTR), properly invokes the signal
3843 handler and retries if the latter returned successfully."""
3844 r, w = os.pipe()
3845 fdopen_kwargs["closefd"] = False
3846 def alarm_handler(sig, frame):
3847 os.write(w, b"bar")
3848 signal.signal(signal.SIGALRM, alarm_handler)
3849 try:
3850 rio = self.io.open(r, **fdopen_kwargs)
3851 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003852 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003853 # Expected behaviour:
3854 # - first raw read() returns partial b"foo"
3855 # - second raw read() returns EINTR
3856 # - third raw read() returns b"bar"
3857 self.assertEqual(decode(rio.read(6)), "foobar")
3858 finally:
3859 rio.close()
3860 os.close(w)
3861 os.close(r)
3862
Antoine Pitrou20db5112011-08-19 20:32:34 +02003863 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003864 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3865 mode="rb")
3866
Antoine Pitrou20db5112011-08-19 20:32:34 +02003867 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003868 self.check_interrupted_read_retry(lambda x: x,
3869 mode="r")
3870
3871 @unittest.skipUnless(threading, 'Threading required for this test.')
3872 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3873 """Check that a buffered write, when it gets interrupted (either
3874 returning a partial result or EINTR), properly invokes the signal
3875 handler and retries if the latter returned successfully."""
3876 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003877
Antoine Pitrou707ce822011-02-25 21:24:11 +00003878 # A quantity that exceeds the buffer size of an anonymous pipe's
3879 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003880 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003881 r, w = os.pipe()
3882 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003883
Antoine Pitrou707ce822011-02-25 21:24:11 +00003884 # We need a separate thread to read from the pipe and allow the
3885 # write() to finish. This thread is started after the SIGALRM is
3886 # received (forcing a first EINTR in write()).
3887 read_results = []
3888 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003889 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00003890 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003891 try:
3892 while not write_finished:
3893 while r in select.select([r], [], [], 1.0)[0]:
3894 s = os.read(r, 1024)
3895 read_results.append(s)
3896 except BaseException as exc:
3897 nonlocal error
3898 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00003899 t = threading.Thread(target=_read)
3900 t.daemon = True
3901 def alarm1(sig, frame):
3902 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003903 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003904 def alarm2(sig, frame):
3905 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003906
3907 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00003908 signal.signal(signal.SIGALRM, alarm1)
3909 try:
3910 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003911 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003912 # Expected behaviour:
3913 # - first raw write() is partial (because of the limited pipe buffer
3914 # and the first alarm)
3915 # - second raw write() returns EINTR (because of the second alarm)
3916 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003917 written = wio.write(large_data)
3918 self.assertEqual(N, written)
3919
Antoine Pitrou707ce822011-02-25 21:24:11 +00003920 wio.flush()
3921 write_finished = True
3922 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003923
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003924 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003925 self.assertEqual(N, sum(len(x) for x in read_results))
3926 finally:
3927 write_finished = True
3928 os.close(w)
3929 os.close(r)
3930 # This is deliberate. If we didn't close the file descriptor
3931 # before closing wio, wio would try to flush its internal
3932 # buffer, and could block (in case of failure).
3933 try:
3934 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003935 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003936 if e.errno != errno.EBADF:
3937 raise
3938
Antoine Pitrou20db5112011-08-19 20:32:34 +02003939 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003940 self.check_interrupted_write_retry(b"x", mode="wb")
3941
Antoine Pitrou20db5112011-08-19 20:32:34 +02003942 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003943 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3944
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003945
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003946class CSignalsTest(SignalsTest):
3947 io = io
3948
3949class PySignalsTest(SignalsTest):
3950 io = pyio
3951
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003952 # Handling reentrancy issues would slow down _pyio even more, so the
3953 # tests are disabled.
3954 test_reentrant_write_buffered = None
3955 test_reentrant_write_text = None
3956
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003957
Ezio Melottidaa42c72013-03-23 16:30:16 +02003958def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07003959 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003960 CBufferedReaderTest, PyBufferedReaderTest,
3961 CBufferedWriterTest, PyBufferedWriterTest,
3962 CBufferedRWPairTest, PyBufferedRWPairTest,
3963 CBufferedRandomTest, PyBufferedRandomTest,
3964 StatefulIncrementalDecoderTest,
3965 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3966 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003967 CMiscIOTest, PyMiscIOTest,
3968 CSignalsTest, PySignalsTest,
3969 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003970
3971 # Put the namespaces of the IO module we are testing and some useful mock
3972 # classes in the __dict__ of each test.
3973 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003974 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003975 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3976 c_io_ns = {name : getattr(io, name) for name in all_members}
3977 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3978 globs = globals()
3979 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3980 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3981 # Avoid turning open into a bound method.
3982 py_io_ns["open"] = pyio.OpenWrapper
3983 for test in tests:
3984 if test.__name__.startswith("C"):
3985 for name, obj in c_io_ns.items():
3986 setattr(test, name, obj)
3987 elif test.__name__.startswith("Py"):
3988 for name, obj in py_io_ns.items():
3989 setattr(test, name, obj)
3990
Ezio Melottidaa42c72013-03-23 16:30:16 +02003991 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3992 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003993
3994if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003995 unittest.main()