blob: 32f76a6c627186f218f976d5dc08a9868a779bf7 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Berker Peksagce643912015-05-06 06:33:17 +030038from test.support.script_helper import assert_python_ok, run_python_until_end
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000047
Martin Panter6bb91f32016-05-28 00:41:57 +000048try:
49 import ctypes
50except ImportError:
51 def byteslike(*pos, **kw):
52 return array.array("b", bytes(*pos, **kw))
53else:
54 def byteslike(*pos, **kw):
55 """Create a bytes-like object having no string or sequence methods"""
56 data = bytes(*pos, **kw)
57 obj = EmptyStruct()
58 ctypes.resize(obj, len(data))
59 memoryview(obj).cast("B")[:] = data
60 return obj
61 class EmptyStruct(ctypes.Structure):
62 pass
63
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064def _default_chunk_size():
65 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000066 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000067 return f._CHUNK_SIZE
68
69
Antoine Pitrou328ec742010-09-14 18:37:24 +000070class MockRawIOWithoutRead:
71 """A RawIO implementation without read(), so as to exercise the default
72 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000073
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000074 def __init__(self, read_stack=()):
75 self._read_stack = list(read_stack)
76 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000077 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000078 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000079
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000082 return len(b)
83
84 def writable(self):
85 return True
86
Guido van Rossum68bbcd22007-02-27 17:19:33 +000087 def fileno(self):
88 return 42
89
90 def readable(self):
91 return True
92
Guido van Rossum01a27522007-03-07 01:00:12 +000093 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000094 return True
95
Guido van Rossum01a27522007-03-07 01:00:12 +000096 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000098
99 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000100 return 0 # same comment as above
101
102 def readinto(self, buf):
103 self._reads += 1
104 max_len = len(buf)
105 try:
106 data = self._read_stack[0]
107 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000108 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000109 return 0
110 if data is None:
111 del self._read_stack[0]
112 return None
113 n = len(data)
114 if len(data) <= max_len:
115 del self._read_stack[0]
116 buf[:n] = data
117 return n
118 else:
119 buf[:] = data[:max_len]
120 self._read_stack[0] = data[max_len:]
121 return max_len
122
123 def truncate(self, pos=None):
124 return pos
125
Antoine Pitrou328ec742010-09-14 18:37:24 +0000126class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
127 pass
128
129class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
130 pass
131
132
133class MockRawIO(MockRawIOWithoutRead):
134
135 def read(self, n=None):
136 self._reads += 1
137 try:
138 return self._read_stack.pop(0)
139 except:
140 self._extraneous_reads += 1
141 return b""
142
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000143class CMockRawIO(MockRawIO, io.RawIOBase):
144 pass
145
146class PyMockRawIO(MockRawIO, pyio.RawIOBase):
147 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000148
Guido van Rossuma9e20242007-03-08 00:43:48 +0000149
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000150class MisbehavedRawIO(MockRawIO):
151 def write(self, b):
152 return super().write(b) * 2
153
154 def read(self, n=None):
155 return super().read(n) * 2
156
157 def seek(self, pos, whence):
158 return -123
159
160 def tell(self):
161 return -456
162
163 def readinto(self, buf):
164 super().readinto(buf)
165 return len(buf) * 5
166
167class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
168 pass
169
170class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
171 pass
172
173
174class CloseFailureIO(MockRawIO):
175 closed = 0
176
177 def close(self):
178 if not self.closed:
179 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200180 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181
182class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
183 pass
184
185class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
186 pass
187
188
189class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000190
191 def __init__(self, data):
192 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000193 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000194
195 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000197 self.read_history.append(None if res is None else len(res))
198 return res
199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000200 def readinto(self, b):
201 res = super().readinto(b)
202 self.read_history.append(res)
203 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000204
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000205class CMockFileIO(MockFileIO, io.BytesIO):
206 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000207
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000208class PyMockFileIO(MockFileIO, pyio.BytesIO):
209 pass
210
211
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000212class MockUnseekableIO:
213 def seekable(self):
214 return False
215
216 def seek(self, *args):
217 raise self.UnsupportedOperation("not seekable")
218
219 def tell(self, *args):
220 raise self.UnsupportedOperation("not seekable")
221
Martin Panter754aab22016-03-31 07:21:56 +0000222 def truncate(self, *args):
223 raise self.UnsupportedOperation("not seekable")
224
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000225class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
226 UnsupportedOperation = io.UnsupportedOperation
227
228class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
229 UnsupportedOperation = pyio.UnsupportedOperation
230
231
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000232class MockNonBlockWriterIO:
233
234 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000235 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000236 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000238 def pop_written(self):
239 s = b"".join(self._write_stack)
240 self._write_stack[:] = []
241 return s
242
243 def block_on(self, char):
244 """Block when a given char is encountered."""
245 self._blocker_char = char
246
247 def readable(self):
248 return True
249
250 def seekable(self):
251 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000252
Guido van Rossum01a27522007-03-07 01:00:12 +0000253 def writable(self):
254 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000255
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000256 def write(self, b):
257 b = bytes(b)
258 n = -1
259 if self._blocker_char:
260 try:
261 n = b.index(self._blocker_char)
262 except ValueError:
263 pass
264 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100265 if n > 0:
266 # write data up to the first blocker
267 self._write_stack.append(b[:n])
268 return n
269 else:
270 # cancel blocker and indicate would block
271 self._blocker_char = None
272 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000273 self._write_stack.append(b)
274 return len(b)
275
276class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
277 BlockingIOError = io.BlockingIOError
278
279class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
280 BlockingIOError = pyio.BlockingIOError
281
Guido van Rossuma9e20242007-03-08 00:43:48 +0000282
Guido van Rossum28524c72007-02-27 05:47:44 +0000283class IOTest(unittest.TestCase):
284
Neal Norwitze7789b12008-03-24 06:18:09 +0000285 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000286 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000287
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000288 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000289 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000290
Guido van Rossum28524c72007-02-27 05:47:44 +0000291 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000292 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293 f.truncate(0)
294 self.assertEqual(f.tell(), 5)
295 f.seek(0)
296
297 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000298 self.assertEqual(f.seek(0), 0)
299 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000300 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000301 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000302 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000303 buffer = bytearray(b" world\n\n\n")
304 self.assertEqual(f.write(buffer), 9)
305 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000306 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000307 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000308 self.assertEqual(f.seek(-1, 2), 13)
309 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000310
Guido van Rossum87429772007-04-10 21:06:59 +0000311 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000312 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000313 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000314
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 def read_ops(self, f, buffered=False):
316 data = f.read(5)
317 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000318 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000319 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000320 self.assertEqual(bytes(data), b" worl")
321 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000322 self.assertEqual(f.readinto(data), 2)
323 self.assertEqual(len(data), 5)
324 self.assertEqual(data[:2], b"d\n")
325 self.assertEqual(f.seek(0), 0)
326 self.assertEqual(f.read(20), b"hello world\n")
327 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000328 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000329 self.assertEqual(f.seek(-6, 2), 6)
330 self.assertEqual(f.read(5), b"world")
331 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000332 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000333 self.assertEqual(f.seek(-6, 1), 5)
334 self.assertEqual(f.read(5), b" worl")
335 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000336 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000337 if buffered:
338 f.seek(0)
339 self.assertEqual(f.read(), b"hello world\n")
340 f.seek(6)
341 self.assertEqual(f.read(), b"world\n")
342 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000343 f.seek(0)
344 data = byteslike(5)
345 self.assertEqual(f.readinto1(data), 5)
346 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000347
Guido van Rossum34d69e52007-04-10 20:08:41 +0000348 LARGE = 2**31
349
Guido van Rossum53807da2007-04-10 19:01:47 +0000350 def large_file_ops(self, f):
351 assert f.readable()
352 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100353 try:
354 self.assertEqual(f.seek(self.LARGE), self.LARGE)
355 except (OverflowError, ValueError):
356 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000357 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000358 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000359 self.assertEqual(f.tell(), self.LARGE + 3)
360 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000361 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000362 self.assertEqual(f.tell(), self.LARGE + 2)
363 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000364 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000365 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000366 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
367 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000368 self.assertEqual(f.read(2), b"x")
369
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000370 def test_invalid_operations(self):
371 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000372 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000373 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000374 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000375 self.assertRaises(exc, fp.read)
376 self.assertRaises(exc, fp.readline)
377 with self.open(support.TESTFN, "wb", buffering=0) as fp:
378 self.assertRaises(exc, fp.read)
379 self.assertRaises(exc, fp.readline)
380 with self.open(support.TESTFN, "rb", buffering=0) as fp:
381 self.assertRaises(exc, fp.write, b"blah")
382 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000383 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000384 self.assertRaises(exc, fp.write, b"blah")
385 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000386 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000387 self.assertRaises(exc, fp.write, "blah")
388 self.assertRaises(exc, fp.writelines, ["blah\n"])
389 # Non-zero seeking from current or end pos
390 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
391 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000392
Martin Panter754aab22016-03-31 07:21:56 +0000393 def test_optional_abilities(self):
394 # Test for OSError when optional APIs are not supported
395 # The purpose of this test is to try fileno(), reading, writing and
396 # seeking operations with various objects that indicate they do not
397 # support these operations.
398
399 def pipe_reader():
400 [r, w] = os.pipe()
401 os.close(w) # So that read() is harmless
402 return self.FileIO(r, "r")
403
404 def pipe_writer():
405 [r, w] = os.pipe()
406 self.addCleanup(os.close, r)
407 # Guarantee that we can write into the pipe without blocking
408 thread = threading.Thread(target=os.read, args=(r, 100))
409 thread.start()
410 self.addCleanup(thread.join)
411 return self.FileIO(w, "w")
412
413 def buffered_reader():
414 return self.BufferedReader(self.MockUnseekableIO())
415
416 def buffered_writer():
417 return self.BufferedWriter(self.MockUnseekableIO())
418
419 def buffered_random():
420 return self.BufferedRandom(self.BytesIO())
421
422 def buffered_rw_pair():
423 return self.BufferedRWPair(self.MockUnseekableIO(),
424 self.MockUnseekableIO())
425
426 def text_reader():
427 class UnseekableReader(self.MockUnseekableIO):
428 writable = self.BufferedIOBase.writable
429 write = self.BufferedIOBase.write
430 return self.TextIOWrapper(UnseekableReader(), "ascii")
431
432 def text_writer():
433 class UnseekableWriter(self.MockUnseekableIO):
434 readable = self.BufferedIOBase.readable
435 read = self.BufferedIOBase.read
436 return self.TextIOWrapper(UnseekableWriter(), "ascii")
437
438 tests = (
439 (pipe_reader, "fr"), (pipe_writer, "fw"),
440 (buffered_reader, "r"), (buffered_writer, "w"),
441 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
442 (text_reader, "r"), (text_writer, "w"),
443 (self.BytesIO, "rws"), (self.StringIO, "rws"),
444 )
445 for [test, abilities] in tests:
446 if test is pipe_writer and not threading:
447 continue # Skip subtest that uses a background thread
448 with self.subTest(test), test() as obj:
449 readable = "r" in abilities
450 self.assertEqual(obj.readable(), readable)
451 writable = "w" in abilities
452 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000453
454 if isinstance(obj, self.TextIOBase):
455 data = "3"
456 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
457 data = b"3"
458 else:
459 self.fail("Unknown base class")
460
461 if "f" in abilities:
462 obj.fileno()
463 else:
464 self.assertRaises(OSError, obj.fileno)
465
466 if readable:
467 obj.read(1)
468 obj.read()
469 else:
470 self.assertRaises(OSError, obj.read, 1)
471 self.assertRaises(OSError, obj.read)
472
473 if writable:
474 obj.write(data)
475 else:
476 self.assertRaises(OSError, obj.write, data)
477
Martin Panter3ee147f2016-03-31 21:05:31 +0000478 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000479 pipe_reader, pipe_writer):
480 # Pipes seem to appear as seekable on Windows
481 continue
482 seekable = "s" in abilities
483 self.assertEqual(obj.seekable(), seekable)
484
Martin Panter754aab22016-03-31 07:21:56 +0000485 if seekable:
486 obj.tell()
487 obj.seek(0)
488 else:
489 self.assertRaises(OSError, obj.tell)
490 self.assertRaises(OSError, obj.seek, 0)
491
492 if writable and seekable:
493 obj.truncate()
494 obj.truncate(0)
495 else:
496 self.assertRaises(OSError, obj.truncate)
497 self.assertRaises(OSError, obj.truncate, 0)
498
Antoine Pitrou13348842012-01-29 18:36:34 +0100499 def test_open_handles_NUL_chars(self):
500 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300501 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100502
503 bytes_fn = bytes(fn_with_NUL, 'ascii')
504 with warnings.catch_warnings():
505 warnings.simplefilter("ignore", DeprecationWarning)
506 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100507
Guido van Rossum28524c72007-02-27 05:47:44 +0000508 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000509 with self.open(support.TESTFN, "wb", buffering=0) as f:
510 self.assertEqual(f.readable(), False)
511 self.assertEqual(f.writable(), True)
512 self.assertEqual(f.seekable(), True)
513 self.write_ops(f)
514 with self.open(support.TESTFN, "rb", buffering=0) as f:
515 self.assertEqual(f.readable(), True)
516 self.assertEqual(f.writable(), False)
517 self.assertEqual(f.seekable(), True)
518 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000519
Guido van Rossum87429772007-04-10 21:06:59 +0000520 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000521 with self.open(support.TESTFN, "wb") as f:
522 self.assertEqual(f.readable(), False)
523 self.assertEqual(f.writable(), True)
524 self.assertEqual(f.seekable(), True)
525 self.write_ops(f)
526 with self.open(support.TESTFN, "rb") as f:
527 self.assertEqual(f.readable(), True)
528 self.assertEqual(f.writable(), False)
529 self.assertEqual(f.seekable(), True)
530 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000531
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000532 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000533 with self.open(support.TESTFN, "wb") as f:
534 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
535 with self.open(support.TESTFN, "rb") as f:
536 self.assertEqual(f.readline(), b"abc\n")
537 self.assertEqual(f.readline(10), b"def\n")
538 self.assertEqual(f.readline(2), b"xy")
539 self.assertEqual(f.readline(4), b"zzy\n")
540 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000541 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000542 self.assertRaises(TypeError, f.readline, 5.3)
543 with self.open(support.TESTFN, "r") as f:
544 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000545
Guido van Rossum28524c72007-02-27 05:47:44 +0000546 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000547 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000548 self.write_ops(f)
549 data = f.getvalue()
550 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000551 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000552 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000553
Guido van Rossum53807da2007-04-10 19:01:47 +0000554 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000555 # On Windows and Mac OSX this test comsumes large resources; It takes
556 # a long time to build the >2GB file and takes >2GB of disk space
557 # therefore the resource must be enabled to run this test.
558 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600559 support.requires(
560 'largefile',
561 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000562 with self.open(support.TESTFN, "w+b", 0) as f:
563 self.large_file_ops(f)
564 with self.open(support.TESTFN, "w+b") as f:
565 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000566
567 def test_with_open(self):
568 for bufsize in (0, 1, 100):
569 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000570 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000571 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000572 self.assertEqual(f.closed, True)
573 f = None
574 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000575 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000576 1/0
577 except ZeroDivisionError:
578 self.assertEqual(f.closed, True)
579 else:
580 self.fail("1/0 didn't raise an exception")
581
Antoine Pitrou08838b62009-01-21 00:55:13 +0000582 # issue 5008
583 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000584 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000585 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000586 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000587 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000588 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000589 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000590 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300591 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000592
Guido van Rossum87429772007-04-10 21:06:59 +0000593 def test_destructor(self):
594 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000595 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000596 def __del__(self):
597 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000598 try:
599 f = super().__del__
600 except AttributeError:
601 pass
602 else:
603 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000604 def close(self):
605 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000606 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000607 def flush(self):
608 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000609 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000610 with support.check_warnings(('', ResourceWarning)):
611 f = MyFileIO(support.TESTFN, "wb")
612 f.write(b"xxx")
613 del f
614 support.gc_collect()
615 self.assertEqual(record, [1, 2, 3])
616 with self.open(support.TESTFN, "rb") as f:
617 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000618
619 def _check_base_destructor(self, base):
620 record = []
621 class MyIO(base):
622 def __init__(self):
623 # This exercises the availability of attributes on object
624 # destruction.
625 # (in the C version, close() is called by the tp_dealloc
626 # function, not by __del__)
627 self.on_del = 1
628 self.on_close = 2
629 self.on_flush = 3
630 def __del__(self):
631 record.append(self.on_del)
632 try:
633 f = super().__del__
634 except AttributeError:
635 pass
636 else:
637 f()
638 def close(self):
639 record.append(self.on_close)
640 super().close()
641 def flush(self):
642 record.append(self.on_flush)
643 super().flush()
644 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000645 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000646 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000647 self.assertEqual(record, [1, 2, 3])
648
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000649 def test_IOBase_destructor(self):
650 self._check_base_destructor(self.IOBase)
651
652 def test_RawIOBase_destructor(self):
653 self._check_base_destructor(self.RawIOBase)
654
655 def test_BufferedIOBase_destructor(self):
656 self._check_base_destructor(self.BufferedIOBase)
657
658 def test_TextIOBase_destructor(self):
659 self._check_base_destructor(self.TextIOBase)
660
Guido van Rossum87429772007-04-10 21:06:59 +0000661 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000662 with self.open(support.TESTFN, "wb") as f:
663 f.write(b"xxx")
664 with self.open(support.TESTFN, "rb") as f:
665 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000666
Guido van Rossumd4103952007-04-12 05:44:49 +0000667 def test_array_writes(self):
668 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000669 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000670 def check(f):
671 with f:
672 self.assertEqual(f.write(a), n)
673 f.writelines((a,))
674 check(self.BytesIO())
675 check(self.FileIO(support.TESTFN, "w"))
676 check(self.BufferedWriter(self.MockRawIO()))
677 check(self.BufferedRandom(self.MockRawIO()))
678 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000679
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000680 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000681 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000682 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000683
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000684 def test_read_closed(self):
685 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000686 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000687 with self.open(support.TESTFN, "r") as f:
688 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000689 self.assertEqual(file.read(), "egg\n")
690 file.seek(0)
691 file.close()
692 self.assertRaises(ValueError, file.read)
693
694 def test_no_closefd_with_filename(self):
695 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000696 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000697
698 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000699 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000700 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000701 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000702 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000703 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000704 self.assertEqual(file.buffer.raw.closefd, False)
705
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000706 def test_garbage_collection(self):
707 # FileIO objects are collected, and collecting them flushes
708 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000709 with support.check_warnings(('', ResourceWarning)):
710 f = self.FileIO(support.TESTFN, "wb")
711 f.write(b"abcxxx")
712 f.f = f
713 wr = weakref.ref(f)
714 del f
715 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300716 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000717 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000718 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000719
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000720 def test_unbounded_file(self):
721 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
722 zero = "/dev/zero"
723 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000724 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000725 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000726 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000727 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000728 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000729 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000730 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000731 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000732 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000733 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000734 self.assertRaises(OverflowError, f.read)
735
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200736 def check_flush_error_on_close(self, *args, **kwargs):
737 # Test that the file is closed despite failed flush
738 # and that flush() is called before file closed.
739 f = self.open(*args, **kwargs)
740 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000741 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200742 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200743 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000744 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200745 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600746 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200747 self.assertTrue(closed) # flush() called
748 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200749 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200750
751 def test_flush_error_on_close(self):
752 # raw file
753 # Issue #5700: io.FileIO calls flush() after file closed
754 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
755 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
756 self.check_flush_error_on_close(fd, 'wb', buffering=0)
757 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
758 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
759 os.close(fd)
760 # buffered io
761 self.check_flush_error_on_close(support.TESTFN, 'wb')
762 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
763 self.check_flush_error_on_close(fd, 'wb')
764 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
765 self.check_flush_error_on_close(fd, 'wb', closefd=False)
766 os.close(fd)
767 # text io
768 self.check_flush_error_on_close(support.TESTFN, 'w')
769 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
770 self.check_flush_error_on_close(fd, 'w')
771 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
772 self.check_flush_error_on_close(fd, 'w', closefd=False)
773 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000774
775 def test_multi_close(self):
776 f = self.open(support.TESTFN, "wb", buffering=0)
777 f.close()
778 f.close()
779 f.close()
780 self.assertRaises(ValueError, f.flush)
781
Antoine Pitrou328ec742010-09-14 18:37:24 +0000782 def test_RawIOBase_read(self):
783 # Exercise the default RawIOBase.read() implementation (which calls
784 # readinto() internally).
785 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
786 self.assertEqual(rawio.read(2), b"ab")
787 self.assertEqual(rawio.read(2), b"c")
788 self.assertEqual(rawio.read(2), b"d")
789 self.assertEqual(rawio.read(2), None)
790 self.assertEqual(rawio.read(2), b"ef")
791 self.assertEqual(rawio.read(2), b"g")
792 self.assertEqual(rawio.read(2), None)
793 self.assertEqual(rawio.read(2), b"")
794
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400795 def test_types_have_dict(self):
796 test = (
797 self.IOBase(),
798 self.RawIOBase(),
799 self.TextIOBase(),
800 self.StringIO(),
801 self.BytesIO()
802 )
803 for obj in test:
804 self.assertTrue(hasattr(obj, "__dict__"))
805
Ross Lagerwall59142db2011-10-31 20:34:46 +0200806 def test_opener(self):
807 with self.open(support.TESTFN, "w") as f:
808 f.write("egg\n")
809 fd = os.open(support.TESTFN, os.O_RDONLY)
810 def opener(path, flags):
811 return fd
812 with self.open("non-existent", "r", opener=opener) as f:
813 self.assertEqual(f.read(), "egg\n")
814
Barry Warsaw480e2852016-06-08 17:47:26 -0400815 def test_bad_opener_negative_1(self):
816 # Issue #27066.
817 def badopener(fname, flags):
818 return -1
819 with self.assertRaises(ValueError) as cm:
820 open('non-existent', 'r', opener=badopener)
821 self.assertEqual(str(cm.exception), 'opener returned -1')
822
823 def test_bad_opener_other_negative(self):
824 # Issue #27066.
825 def badopener(fname, flags):
826 return -2
827 with self.assertRaises(ValueError) as cm:
828 open('non-existent', 'r', opener=badopener)
829 self.assertEqual(str(cm.exception), 'opener returned -2')
830
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200831 def test_fileio_closefd(self):
832 # Issue #4841
833 with self.open(__file__, 'rb') as f1, \
834 self.open(__file__, 'rb') as f2:
835 fileio = self.FileIO(f1.fileno(), closefd=False)
836 # .__init__() must not close f1
837 fileio.__init__(f2.fileno(), closefd=False)
838 f1.readline()
839 # .close() must not close f2
840 fileio.close()
841 f2.readline()
842
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300843 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200844 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300845 with self.assertRaises(ValueError):
846 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300847
848 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200849 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300850 with self.assertRaises(ValueError):
851 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300852
Martin Panter6bb91f32016-05-28 00:41:57 +0000853 def test_buffered_readinto_mixin(self):
854 # Test the implementation provided by BufferedIOBase
855 class Stream(self.BufferedIOBase):
856 def read(self, size):
857 return b"12345"
858 read1 = read
859 stream = Stream()
860 for method in ("readinto", "readinto1"):
861 with self.subTest(method):
862 buffer = byteslike(5)
863 self.assertEqual(getattr(stream, method)(buffer), 5)
864 self.assertEqual(bytes(buffer), b"12345")
865
Ethan Furmand62548a2016-06-04 14:38:43 -0700866 def test_fspath_support(self):
867 class PathLike:
868 def __init__(self, path):
869 self.path = path
870
871 def __fspath__(self):
872 return self.path
873
874 def check_path_succeeds(path):
875 with self.open(path, "w") as f:
876 f.write("egg\n")
877
878 with self.open(path, "r") as f:
879 self.assertEqual(f.read(), "egg\n")
880
881 check_path_succeeds(PathLike(support.TESTFN))
882 check_path_succeeds(PathLike(support.TESTFN.encode('utf-8')))
883
884 bad_path = PathLike(TypeError)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700885 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700886 self.open(bad_path, 'w')
887
888 # ensure that refcounting is correct with some error conditions
889 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
890 self.open(PathLike(support.TESTFN), 'rwxa')
891
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200892
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000893class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200894
895 def test_IOBase_finalize(self):
896 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
897 # class which inherits IOBase and an object of this class are caught
898 # in a reference cycle and close() is already in the method cache.
899 class MyIO(self.IOBase):
900 def close(self):
901 pass
902
903 # create an instance to populate the method cache
904 MyIO()
905 obj = MyIO()
906 obj.obj = obj
907 wr = weakref.ref(obj)
908 del MyIO
909 del obj
910 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300911 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000912
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000913class PyIOTest(IOTest):
914 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000915
Guido van Rossuma9e20242007-03-08 00:43:48 +0000916
Gregory P. Smith1bef9072015-04-14 13:24:34 -0700917@support.cpython_only
918class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -0700919
Gregory P. Smith054b0652015-04-14 12:58:05 -0700920 def test_RawIOBase_io_in_pyio_match(self):
921 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +0200922 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
923 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -0700924 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
925
926 def test_RawIOBase_pyio_in_io_match(self):
927 """Test that c RawIOBase class has all pyio RawIOBase methods"""
928 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
929 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
930
931
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000932class CommonBufferedTests:
933 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
934
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000935 def test_detach(self):
936 raw = self.MockRawIO()
937 buf = self.tp(raw)
938 self.assertIs(buf.detach(), raw)
939 self.assertRaises(ValueError, buf.detach)
940
Benjamin Peterson10e76b62014-12-21 20:51:50 -0600941 repr(buf) # Should still work
942
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000943 def test_fileno(self):
944 rawio = self.MockRawIO()
945 bufio = self.tp(rawio)
946
Ezio Melottib3aedd42010-11-20 19:04:17 +0000947 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000948
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000949 def test_invalid_args(self):
950 rawio = self.MockRawIO()
951 bufio = self.tp(rawio)
952 # Invalid whence
953 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200954 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000955
956 def test_override_destructor(self):
957 tp = self.tp
958 record = []
959 class MyBufferedIO(tp):
960 def __del__(self):
961 record.append(1)
962 try:
963 f = super().__del__
964 except AttributeError:
965 pass
966 else:
967 f()
968 def close(self):
969 record.append(2)
970 super().close()
971 def flush(self):
972 record.append(3)
973 super().flush()
974 rawio = self.MockRawIO()
975 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000976 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000977 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +0000978 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000979
980 def test_context_manager(self):
981 # Test usability as a context manager
982 rawio = self.MockRawIO()
983 bufio = self.tp(rawio)
984 def _with():
985 with bufio:
986 pass
987 _with()
988 # bufio should now be closed, and using it a second time should raise
989 # a ValueError.
990 self.assertRaises(ValueError, _with)
991
992 def test_error_through_destructor(self):
993 # Test that the exception state is not modified by a destructor,
994 # even if close() fails.
995 rawio = self.CloseFailureIO()
996 def f():
997 self.tp(rawio).xyzzy
998 with support.captured_output("stderr") as s:
999 self.assertRaises(AttributeError, f)
1000 s = s.getvalue().strip()
1001 if s:
1002 # The destructor *may* have printed an unraisable error, check it
1003 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001004 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001005 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +00001006
Antoine Pitrou716c4442009-05-23 19:04:03 +00001007 def test_repr(self):
1008 raw = self.MockRawIO()
1009 b = self.tp(raw)
Serhiy Storchaka521e5862014-07-22 15:00:37 +03001010 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001011 self.assertEqual(repr(b), "<%s>" % clsname)
1012 raw.name = "dummy"
1013 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
1014 raw.name = b"dummy"
1015 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
1016
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02001017 def test_recursive_repr(self):
1018 # Issue #25455
1019 raw = self.MockRawIO()
1020 b = self.tp(raw)
1021 with support.swap_attr(raw, 'name', b):
1022 try:
1023 repr(b) # Should not crash
1024 except RuntimeError:
1025 pass
1026
Antoine Pitrou6be88762010-05-03 16:48:20 +00001027 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001028 # Test that buffered file is closed despite failed flush
1029 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001030 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001031 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001032 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001033 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001034 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001035 raw.flush = bad_flush
1036 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001037 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001038 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001039 self.assertTrue(raw.closed)
1040 self.assertTrue(closed) # flush() called
1041 self.assertFalse(closed[0]) # flush() called before file closed
1042 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001043 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001044
1045 def test_close_error_on_close(self):
1046 raw = self.MockRawIO()
1047 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001048 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001049 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001050 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001051 raw.close = bad_close
1052 b = self.tp(raw)
1053 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001054 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001055 b.close()
1056 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001057 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001058 self.assertEqual(err.exception.__context__.args, ('flush',))
1059 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001060
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001061 def test_nonnormalized_close_error_on_close(self):
1062 # Issue #21677
1063 raw = self.MockRawIO()
1064 def bad_flush():
1065 raise non_existing_flush
1066 def bad_close():
1067 raise non_existing_close
1068 raw.close = bad_close
1069 b = self.tp(raw)
1070 b.flush = bad_flush
1071 with self.assertRaises(NameError) as err: # exception not swallowed
1072 b.close()
1073 self.assertIn('non_existing_close', str(err.exception))
1074 self.assertIsInstance(err.exception.__context__, NameError)
1075 self.assertIn('non_existing_flush', str(err.exception.__context__))
1076 self.assertFalse(b.closed)
1077
Antoine Pitrou6be88762010-05-03 16:48:20 +00001078 def test_multi_close(self):
1079 raw = self.MockRawIO()
1080 b = self.tp(raw)
1081 b.close()
1082 b.close()
1083 b.close()
1084 self.assertRaises(ValueError, b.flush)
1085
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001086 def test_unseekable(self):
1087 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1088 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1089 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1090
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001091 def test_readonly_attributes(self):
1092 raw = self.MockRawIO()
1093 buf = self.tp(raw)
1094 x = self.MockRawIO()
1095 with self.assertRaises(AttributeError):
1096 buf.raw = x
1097
Guido van Rossum78892e42007-04-06 17:31:18 +00001098
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001099class SizeofTest:
1100
1101 @support.cpython_only
1102 def test_sizeof(self):
1103 bufsize1 = 4096
1104 bufsize2 = 8192
1105 rawio = self.MockRawIO()
1106 bufio = self.tp(rawio, buffer_size=bufsize1)
1107 size = sys.getsizeof(bufio) - bufsize1
1108 rawio = self.MockRawIO()
1109 bufio = self.tp(rawio, buffer_size=bufsize2)
1110 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1111
Jesus Ceadc469452012-10-04 12:37:56 +02001112 @support.cpython_only
1113 def test_buffer_freeing(self) :
1114 bufsize = 4096
1115 rawio = self.MockRawIO()
1116 bufio = self.tp(rawio, buffer_size=bufsize)
1117 size = sys.getsizeof(bufio) - bufsize
1118 bufio.close()
1119 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001120
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001121class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1122 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001123
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001124 def test_constructor(self):
1125 rawio = self.MockRawIO([b"abc"])
1126 bufio = self.tp(rawio)
1127 bufio.__init__(rawio)
1128 bufio.__init__(rawio, buffer_size=1024)
1129 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001130 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001131 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1132 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1133 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1134 rawio = self.MockRawIO([b"abc"])
1135 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001136 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001137
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001138 def test_uninitialized(self):
1139 bufio = self.tp.__new__(self.tp)
1140 del bufio
1141 bufio = self.tp.__new__(self.tp)
1142 self.assertRaisesRegex((ValueError, AttributeError),
1143 'uninitialized|has no attribute',
1144 bufio.read, 0)
1145 bufio.__init__(self.MockRawIO())
1146 self.assertEqual(bufio.read(0), b'')
1147
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001148 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001149 for arg in (None, 7):
1150 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1151 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001152 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001153 # Invalid args
1154 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001155
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001156 def test_read1(self):
1157 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1158 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001159 self.assertEqual(b"a", bufio.read(1))
1160 self.assertEqual(b"b", bufio.read1(1))
1161 self.assertEqual(rawio._reads, 1)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001162 self.assertEqual(b"", bufio.read1(0))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001163 self.assertEqual(b"c", bufio.read1(100))
1164 self.assertEqual(rawio._reads, 1)
1165 self.assertEqual(b"d", bufio.read1(100))
1166 self.assertEqual(rawio._reads, 2)
1167 self.assertEqual(b"efg", bufio.read1(100))
1168 self.assertEqual(rawio._reads, 3)
1169 self.assertEqual(b"", bufio.read1(100))
1170 self.assertEqual(rawio._reads, 4)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001171
1172 def test_read1_arbitrary(self):
1173 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1174 bufio = self.tp(rawio)
1175 self.assertEqual(b"a", bufio.read(1))
1176 self.assertEqual(b"bc", bufio.read1())
1177 self.assertEqual(b"d", bufio.read1())
1178 self.assertEqual(b"efg", bufio.read1(-1))
1179 self.assertEqual(rawio._reads, 3)
1180 self.assertEqual(b"", bufio.read1())
1181 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001182
1183 def test_readinto(self):
1184 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1185 bufio = self.tp(rawio)
1186 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001187 self.assertEqual(bufio.readinto(b), 2)
1188 self.assertEqual(b, b"ab")
1189 self.assertEqual(bufio.readinto(b), 2)
1190 self.assertEqual(b, b"cd")
1191 self.assertEqual(bufio.readinto(b), 2)
1192 self.assertEqual(b, b"ef")
1193 self.assertEqual(bufio.readinto(b), 1)
1194 self.assertEqual(b, b"gf")
1195 self.assertEqual(bufio.readinto(b), 0)
1196 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001197 rawio = self.MockRawIO((b"abc", None))
1198 bufio = self.tp(rawio)
1199 self.assertEqual(bufio.readinto(b), 2)
1200 self.assertEqual(b, b"ab")
1201 self.assertEqual(bufio.readinto(b), 1)
1202 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001203
Benjamin Petersona96fea02014-06-22 14:17:44 -07001204 def test_readinto1(self):
1205 buffer_size = 10
1206 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1207 bufio = self.tp(rawio, buffer_size=buffer_size)
1208 b = bytearray(2)
1209 self.assertEqual(bufio.peek(3), b'abc')
1210 self.assertEqual(rawio._reads, 1)
1211 self.assertEqual(bufio.readinto1(b), 2)
1212 self.assertEqual(b, b"ab")
1213 self.assertEqual(rawio._reads, 1)
1214 self.assertEqual(bufio.readinto1(b), 1)
1215 self.assertEqual(b[:1], b"c")
1216 self.assertEqual(rawio._reads, 1)
1217 self.assertEqual(bufio.readinto1(b), 2)
1218 self.assertEqual(b, b"de")
1219 self.assertEqual(rawio._reads, 2)
1220 b = bytearray(2*buffer_size)
1221 self.assertEqual(bufio.peek(3), b'fgh')
1222 self.assertEqual(rawio._reads, 3)
1223 self.assertEqual(bufio.readinto1(b), 6)
1224 self.assertEqual(b[:6], b"fghjkl")
1225 self.assertEqual(rawio._reads, 4)
1226
1227 def test_readinto_array(self):
1228 buffer_size = 60
1229 data = b"a" * 26
1230 rawio = self.MockRawIO((data,))
1231 bufio = self.tp(rawio, buffer_size=buffer_size)
1232
1233 # Create an array with element size > 1 byte
1234 b = array.array('i', b'x' * 32)
1235 assert len(b) != 16
1236
1237 # Read into it. We should get as many *bytes* as we can fit into b
1238 # (which is more than the number of elements)
1239 n = bufio.readinto(b)
1240 self.assertGreater(n, len(b))
1241
1242 # Check that old contents of b are preserved
1243 bm = memoryview(b).cast('B')
1244 self.assertLess(n, len(bm))
1245 self.assertEqual(bm[:n], data[:n])
1246 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1247
1248 def test_readinto1_array(self):
1249 buffer_size = 60
1250 data = b"a" * 26
1251 rawio = self.MockRawIO((data,))
1252 bufio = self.tp(rawio, buffer_size=buffer_size)
1253
1254 # Create an array with element size > 1 byte
1255 b = array.array('i', b'x' * 32)
1256 assert len(b) != 16
1257
1258 # Read into it. We should get as many *bytes* as we can fit into b
1259 # (which is more than the number of elements)
1260 n = bufio.readinto1(b)
1261 self.assertGreater(n, len(b))
1262
1263 # Check that old contents of b are preserved
1264 bm = memoryview(b).cast('B')
1265 self.assertLess(n, len(bm))
1266 self.assertEqual(bm[:n], data[:n])
1267 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1268
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001269 def test_readlines(self):
1270 def bufio():
1271 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1272 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001273 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1274 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1275 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001276
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001277 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001278 data = b"abcdefghi"
1279 dlen = len(data)
1280
1281 tests = [
1282 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1283 [ 100, [ 3, 3, 3], [ dlen ] ],
1284 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1285 ]
1286
1287 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001288 rawio = self.MockFileIO(data)
1289 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001290 pos = 0
1291 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001292 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001293 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001294 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001295 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001296
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001297 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001298 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001299 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1300 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001301 self.assertEqual(b"abcd", bufio.read(6))
1302 self.assertEqual(b"e", bufio.read(1))
1303 self.assertEqual(b"fg", bufio.read())
1304 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001305 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001306 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001307
Victor Stinnera80987f2011-05-25 22:47:16 +02001308 rawio = self.MockRawIO((b"a", None, None))
1309 self.assertEqual(b"a", rawio.readall())
1310 self.assertIsNone(rawio.readall())
1311
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001312 def test_read_past_eof(self):
1313 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1314 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001315
Ezio Melottib3aedd42010-11-20 19:04:17 +00001316 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001317
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001318 def test_read_all(self):
1319 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1320 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001321
Ezio Melottib3aedd42010-11-20 19:04:17 +00001322 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001323
Victor Stinner45df8202010-04-28 22:31:17 +00001324 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001325 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001326 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001327 try:
1328 # Write out many bytes with exactly the same number of 0's,
1329 # 1's... 255's. This will help us check that concurrent reading
1330 # doesn't duplicate or forget contents.
1331 N = 1000
1332 l = list(range(256)) * N
1333 random.shuffle(l)
1334 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001335 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001336 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001337 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001338 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001339 errors = []
1340 results = []
1341 def f():
1342 try:
1343 # Intra-buffer read then buffer-flushing read
1344 for n in cycle([1, 19]):
1345 s = bufio.read(n)
1346 if not s:
1347 break
1348 # list.append() is atomic
1349 results.append(s)
1350 except Exception as e:
1351 errors.append(e)
1352 raise
1353 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001354 with support.start_threads(threads):
1355 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001356 self.assertFalse(errors,
1357 "the following exceptions were caught: %r" % errors)
1358 s = b''.join(results)
1359 for i in range(256):
1360 c = bytes(bytearray([i]))
1361 self.assertEqual(s.count(c), N)
1362 finally:
1363 support.unlink(support.TESTFN)
1364
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001365 def test_unseekable(self):
1366 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1367 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1368 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1369 bufio.read(1)
1370 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1371 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1372
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001373 def test_misbehaved_io(self):
1374 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1375 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001376 self.assertRaises(OSError, bufio.seek, 0)
1377 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001378
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001379 def test_no_extraneous_read(self):
1380 # Issue #9550; when the raw IO object has satisfied the read request,
1381 # we should not issue any additional reads, otherwise it may block
1382 # (e.g. socket).
1383 bufsize = 16
1384 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1385 rawio = self.MockRawIO([b"x" * n])
1386 bufio = self.tp(rawio, bufsize)
1387 self.assertEqual(bufio.read(n), b"x" * n)
1388 # Simple case: one raw read is enough to satisfy the request.
1389 self.assertEqual(rawio._extraneous_reads, 0,
1390 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1391 # A more complex case where two raw reads are needed to satisfy
1392 # the request.
1393 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1394 bufio = self.tp(rawio, bufsize)
1395 self.assertEqual(bufio.read(n), b"x" * n)
1396 self.assertEqual(rawio._extraneous_reads, 0,
1397 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1398
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001399 def test_read_on_closed(self):
1400 # Issue #23796
1401 b = io.BufferedReader(io.BytesIO(b"12"))
1402 b.read(1)
1403 b.close()
1404 self.assertRaises(ValueError, b.peek)
1405 self.assertRaises(ValueError, b.read1, 1)
1406
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001407
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001408class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001409 tp = io.BufferedReader
1410
1411 def test_constructor(self):
1412 BufferedReaderTest.test_constructor(self)
1413 # The allocation can succeed on 32-bit builds, e.g. with more
1414 # than 2GB RAM and a 64-bit kernel.
1415 if sys.maxsize > 0x7FFFFFFF:
1416 rawio = self.MockRawIO()
1417 bufio = self.tp(rawio)
1418 self.assertRaises((OverflowError, MemoryError, ValueError),
1419 bufio.__init__, rawio, sys.maxsize)
1420
1421 def test_initialization(self):
1422 rawio = self.MockRawIO([b"abc"])
1423 bufio = self.tp(rawio)
1424 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1425 self.assertRaises(ValueError, bufio.read)
1426 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1427 self.assertRaises(ValueError, bufio.read)
1428 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1429 self.assertRaises(ValueError, bufio.read)
1430
1431 def test_misbehaved_io_read(self):
1432 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1433 bufio = self.tp(rawio)
1434 # _pyio.BufferedReader seems to implement reading different, so that
1435 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001436 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001437
1438 def test_garbage_collection(self):
1439 # C BufferedReader objects are collected.
1440 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001441 with support.check_warnings(('', ResourceWarning)):
1442 rawio = self.FileIO(support.TESTFN, "w+b")
1443 f = self.tp(rawio)
1444 f.f = f
1445 wr = weakref.ref(f)
1446 del f
1447 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001448 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001449
R David Murray67bfe802013-02-23 21:51:05 -05001450 def test_args_error(self):
1451 # Issue #17275
1452 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1453 self.tp(io.BytesIO(), 1024, 1024, 1024)
1454
1455
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001456class PyBufferedReaderTest(BufferedReaderTest):
1457 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001458
Guido van Rossuma9e20242007-03-08 00:43:48 +00001459
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001460class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1461 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001462
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001463 def test_constructor(self):
1464 rawio = self.MockRawIO()
1465 bufio = self.tp(rawio)
1466 bufio.__init__(rawio)
1467 bufio.__init__(rawio, buffer_size=1024)
1468 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001469 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001470 bufio.flush()
1471 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1472 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1473 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1474 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001475 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001476 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001477 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001478
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001479 def test_uninitialized(self):
1480 bufio = self.tp.__new__(self.tp)
1481 del bufio
1482 bufio = self.tp.__new__(self.tp)
1483 self.assertRaisesRegex((ValueError, AttributeError),
1484 'uninitialized|has no attribute',
1485 bufio.write, b'')
1486 bufio.__init__(self.MockRawIO())
1487 self.assertEqual(bufio.write(b''), 0)
1488
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001489 def test_detach_flush(self):
1490 raw = self.MockRawIO()
1491 buf = self.tp(raw)
1492 buf.write(b"howdy!")
1493 self.assertFalse(raw._write_stack)
1494 buf.detach()
1495 self.assertEqual(raw._write_stack, [b"howdy!"])
1496
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001497 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001498 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001499 writer = self.MockRawIO()
1500 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001501 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001502 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001503 buffer = bytearray(b"def")
1504 bufio.write(buffer)
1505 buffer[:] = b"***" # Overwrite our copy of the data
1506 bufio.flush()
1507 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001508
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001509 def test_write_overflow(self):
1510 writer = self.MockRawIO()
1511 bufio = self.tp(writer, 8)
1512 contents = b"abcdefghijklmnop"
1513 for n in range(0, len(contents), 3):
1514 bufio.write(contents[n:n+3])
1515 flushed = b"".join(writer._write_stack)
1516 # At least (total - 8) bytes were implicitly flushed, perhaps more
1517 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001518 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001519
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001520 def check_writes(self, intermediate_func):
1521 # Lots of writes, test the flushed output is as expected.
1522 contents = bytes(range(256)) * 1000
1523 n = 0
1524 writer = self.MockRawIO()
1525 bufio = self.tp(writer, 13)
1526 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1527 def gen_sizes():
1528 for size in count(1):
1529 for i in range(15):
1530 yield size
1531 sizes = gen_sizes()
1532 while n < len(contents):
1533 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001534 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001535 intermediate_func(bufio)
1536 n += size
1537 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001538 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001539
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001540 def test_writes(self):
1541 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001542
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001543 def test_writes_and_flushes(self):
1544 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001545
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001546 def test_writes_and_seeks(self):
1547 def _seekabs(bufio):
1548 pos = bufio.tell()
1549 bufio.seek(pos + 1, 0)
1550 bufio.seek(pos - 1, 0)
1551 bufio.seek(pos, 0)
1552 self.check_writes(_seekabs)
1553 def _seekrel(bufio):
1554 pos = bufio.seek(0, 1)
1555 bufio.seek(+1, 1)
1556 bufio.seek(-1, 1)
1557 bufio.seek(pos, 0)
1558 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001559
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001560 def test_writes_and_truncates(self):
1561 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001562
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001563 def test_write_non_blocking(self):
1564 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001565 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001566
Ezio Melottib3aedd42010-11-20 19:04:17 +00001567 self.assertEqual(bufio.write(b"abcd"), 4)
1568 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001569 # 1 byte will be written, the rest will be buffered
1570 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001571 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001572
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001573 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1574 raw.block_on(b"0")
1575 try:
1576 bufio.write(b"opqrwxyz0123456789")
1577 except self.BlockingIOError as e:
1578 written = e.characters_written
1579 else:
1580 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001581 self.assertEqual(written, 16)
1582 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001583 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001584
Ezio Melottib3aedd42010-11-20 19:04:17 +00001585 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001586 s = raw.pop_written()
1587 # Previously buffered bytes were flushed
1588 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001589
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001590 def test_write_and_rewind(self):
1591 raw = io.BytesIO()
1592 bufio = self.tp(raw, 4)
1593 self.assertEqual(bufio.write(b"abcdef"), 6)
1594 self.assertEqual(bufio.tell(), 6)
1595 bufio.seek(0, 0)
1596 self.assertEqual(bufio.write(b"XY"), 2)
1597 bufio.seek(6, 0)
1598 self.assertEqual(raw.getvalue(), b"XYcdef")
1599 self.assertEqual(bufio.write(b"123456"), 6)
1600 bufio.flush()
1601 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001602
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001603 def test_flush(self):
1604 writer = self.MockRawIO()
1605 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001606 bufio.write(b"abc")
1607 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001608 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001609
Antoine Pitrou131a4892012-10-16 22:57:11 +02001610 def test_writelines(self):
1611 l = [b'ab', b'cd', b'ef']
1612 writer = self.MockRawIO()
1613 bufio = self.tp(writer, 8)
1614 bufio.writelines(l)
1615 bufio.flush()
1616 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1617
1618 def test_writelines_userlist(self):
1619 l = UserList([b'ab', b'cd', b'ef'])
1620 writer = self.MockRawIO()
1621 bufio = self.tp(writer, 8)
1622 bufio.writelines(l)
1623 bufio.flush()
1624 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1625
1626 def test_writelines_error(self):
1627 writer = self.MockRawIO()
1628 bufio = self.tp(writer, 8)
1629 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1630 self.assertRaises(TypeError, bufio.writelines, None)
1631 self.assertRaises(TypeError, bufio.writelines, 'abc')
1632
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001633 def test_destructor(self):
1634 writer = self.MockRawIO()
1635 bufio = self.tp(writer, 8)
1636 bufio.write(b"abc")
1637 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001638 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001639 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001640
1641 def test_truncate(self):
1642 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001643 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001644 bufio = self.tp(raw, 8)
1645 bufio.write(b"abcdef")
1646 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001647 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001648 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001649 self.assertEqual(f.read(), b"abc")
1650
Victor Stinner45df8202010-04-28 22:31:17 +00001651 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001652 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001653 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001654 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001655 # Write out many bytes from many threads and test they were
1656 # all flushed.
1657 N = 1000
1658 contents = bytes(range(256)) * N
1659 sizes = cycle([1, 19])
1660 n = 0
1661 queue = deque()
1662 while n < len(contents):
1663 size = next(sizes)
1664 queue.append(contents[n:n+size])
1665 n += size
1666 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001667 # We use a real file object because it allows us to
1668 # exercise situations where the GIL is released before
1669 # writing the buffer to the raw streams. This is in addition
1670 # to concurrency issues due to switching threads in the middle
1671 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001672 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001673 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001674 errors = []
1675 def f():
1676 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001677 while True:
1678 try:
1679 s = queue.popleft()
1680 except IndexError:
1681 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001682 bufio.write(s)
1683 except Exception as e:
1684 errors.append(e)
1685 raise
1686 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001687 with support.start_threads(threads):
1688 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001689 self.assertFalse(errors,
1690 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001691 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001692 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001693 s = f.read()
1694 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001695 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001696 finally:
1697 support.unlink(support.TESTFN)
1698
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001699 def test_misbehaved_io(self):
1700 rawio = self.MisbehavedRawIO()
1701 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001702 self.assertRaises(OSError, bufio.seek, 0)
1703 self.assertRaises(OSError, bufio.tell)
1704 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001705
Florent Xicluna109d5732012-07-07 17:03:22 +02001706 def test_max_buffer_size_removal(self):
1707 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001708 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001709
Benjamin Peterson68623612012-12-20 11:53:11 -06001710 def test_write_error_on_close(self):
1711 raw = self.MockRawIO()
1712 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001713 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001714 raw.write = bad_write
1715 b = self.tp(raw)
1716 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001717 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001718 self.assertTrue(b.closed)
1719
Benjamin Peterson59406a92009-03-26 17:10:29 +00001720
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001721class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001722 tp = io.BufferedWriter
1723
1724 def test_constructor(self):
1725 BufferedWriterTest.test_constructor(self)
1726 # The allocation can succeed on 32-bit builds, e.g. with more
1727 # than 2GB RAM and a 64-bit kernel.
1728 if sys.maxsize > 0x7FFFFFFF:
1729 rawio = self.MockRawIO()
1730 bufio = self.tp(rawio)
1731 self.assertRaises((OverflowError, MemoryError, ValueError),
1732 bufio.__init__, rawio, sys.maxsize)
1733
1734 def test_initialization(self):
1735 rawio = self.MockRawIO()
1736 bufio = self.tp(rawio)
1737 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1738 self.assertRaises(ValueError, bufio.write, b"def")
1739 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1740 self.assertRaises(ValueError, bufio.write, b"def")
1741 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1742 self.assertRaises(ValueError, bufio.write, b"def")
1743
1744 def test_garbage_collection(self):
1745 # C BufferedWriter objects are collected, and collecting them flushes
1746 # all data to disk.
1747 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001748 with support.check_warnings(('', ResourceWarning)):
1749 rawio = self.FileIO(support.TESTFN, "w+b")
1750 f = self.tp(rawio)
1751 f.write(b"123xxx")
1752 f.x = f
1753 wr = weakref.ref(f)
1754 del f
1755 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001756 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001757 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001758 self.assertEqual(f.read(), b"123xxx")
1759
R David Murray67bfe802013-02-23 21:51:05 -05001760 def test_args_error(self):
1761 # Issue #17275
1762 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1763 self.tp(io.BytesIO(), 1024, 1024, 1024)
1764
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001765
1766class PyBufferedWriterTest(BufferedWriterTest):
1767 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001768
Guido van Rossum01a27522007-03-07 01:00:12 +00001769class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001770
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001771 def test_constructor(self):
1772 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001773 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001774
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001775 def test_uninitialized(self):
1776 pair = self.tp.__new__(self.tp)
1777 del pair
1778 pair = self.tp.__new__(self.tp)
1779 self.assertRaisesRegex((ValueError, AttributeError),
1780 'uninitialized|has no attribute',
1781 pair.read, 0)
1782 self.assertRaisesRegex((ValueError, AttributeError),
1783 'uninitialized|has no attribute',
1784 pair.write, b'')
1785 pair.__init__(self.MockRawIO(), self.MockRawIO())
1786 self.assertEqual(pair.read(0), b'')
1787 self.assertEqual(pair.write(b''), 0)
1788
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001789 def test_detach(self):
1790 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1791 self.assertRaises(self.UnsupportedOperation, pair.detach)
1792
Florent Xicluna109d5732012-07-07 17:03:22 +02001793 def test_constructor_max_buffer_size_removal(self):
1794 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001795 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001796
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001797 def test_constructor_with_not_readable(self):
1798 class NotReadable(MockRawIO):
1799 def readable(self):
1800 return False
1801
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001802 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001803
1804 def test_constructor_with_not_writeable(self):
1805 class NotWriteable(MockRawIO):
1806 def writable(self):
1807 return False
1808
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001809 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001810
1811 def test_read(self):
1812 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1813
1814 self.assertEqual(pair.read(3), b"abc")
1815 self.assertEqual(pair.read(1), b"d")
1816 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001817 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1818 self.assertEqual(pair.read(None), b"abc")
1819
1820 def test_readlines(self):
1821 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1822 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1823 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1824 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001825
1826 def test_read1(self):
1827 # .read1() is delegated to the underlying reader object, so this test
1828 # can be shallow.
1829 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1830
1831 self.assertEqual(pair.read1(3), b"abc")
Martin Panterccb2c0e2016-10-20 23:48:14 +00001832 self.assertEqual(pair.read1(), b"def")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001833
1834 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00001835 for method in ("readinto", "readinto1"):
1836 with self.subTest(method):
1837 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001838
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03001839 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00001840 self.assertEqual(getattr(pair, method)(data), 5)
1841 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001842
1843 def test_write(self):
1844 w = self.MockRawIO()
1845 pair = self.tp(self.MockRawIO(), w)
1846
1847 pair.write(b"abc")
1848 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00001849 buffer = bytearray(b"def")
1850 pair.write(buffer)
1851 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001852 pair.flush()
1853 self.assertEqual(w._write_stack, [b"abc", b"def"])
1854
1855 def test_peek(self):
1856 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1857
1858 self.assertTrue(pair.peek(3).startswith(b"abc"))
1859 self.assertEqual(pair.read(3), b"abc")
1860
1861 def test_readable(self):
1862 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1863 self.assertTrue(pair.readable())
1864
1865 def test_writeable(self):
1866 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1867 self.assertTrue(pair.writable())
1868
1869 def test_seekable(self):
1870 # BufferedRWPairs are never seekable, even if their readers and writers
1871 # are.
1872 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1873 self.assertFalse(pair.seekable())
1874
1875 # .flush() is delegated to the underlying writer object and has been
1876 # tested in the test_write method.
1877
1878 def test_close_and_closed(self):
1879 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1880 self.assertFalse(pair.closed)
1881 pair.close()
1882 self.assertTrue(pair.closed)
1883
Serhiy Storchaka7665be62015-03-24 23:21:57 +02001884 def test_reader_close_error_on_close(self):
1885 def reader_close():
1886 reader_non_existing
1887 reader = self.MockRawIO()
1888 reader.close = reader_close
1889 writer = self.MockRawIO()
1890 pair = self.tp(reader, writer)
1891 with self.assertRaises(NameError) as err:
1892 pair.close()
1893 self.assertIn('reader_non_existing', str(err.exception))
1894 self.assertTrue(pair.closed)
1895 self.assertFalse(reader.closed)
1896 self.assertTrue(writer.closed)
1897
1898 def test_writer_close_error_on_close(self):
1899 def writer_close():
1900 writer_non_existing
1901 reader = self.MockRawIO()
1902 writer = self.MockRawIO()
1903 writer.close = writer_close
1904 pair = self.tp(reader, writer)
1905 with self.assertRaises(NameError) as err:
1906 pair.close()
1907 self.assertIn('writer_non_existing', str(err.exception))
1908 self.assertFalse(pair.closed)
1909 self.assertTrue(reader.closed)
1910 self.assertFalse(writer.closed)
1911
1912 def test_reader_writer_close_error_on_close(self):
1913 def reader_close():
1914 reader_non_existing
1915 def writer_close():
1916 writer_non_existing
1917 reader = self.MockRawIO()
1918 reader.close = reader_close
1919 writer = self.MockRawIO()
1920 writer.close = writer_close
1921 pair = self.tp(reader, writer)
1922 with self.assertRaises(NameError) as err:
1923 pair.close()
1924 self.assertIn('reader_non_existing', str(err.exception))
1925 self.assertIsInstance(err.exception.__context__, NameError)
1926 self.assertIn('writer_non_existing', str(err.exception.__context__))
1927 self.assertFalse(pair.closed)
1928 self.assertFalse(reader.closed)
1929 self.assertFalse(writer.closed)
1930
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001931 def test_isatty(self):
1932 class SelectableIsAtty(MockRawIO):
1933 def __init__(self, isatty):
1934 MockRawIO.__init__(self)
1935 self._isatty = isatty
1936
1937 def isatty(self):
1938 return self._isatty
1939
1940 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1941 self.assertFalse(pair.isatty())
1942
1943 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1944 self.assertTrue(pair.isatty())
1945
1946 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1947 self.assertTrue(pair.isatty())
1948
1949 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1950 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001951
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04001952 def test_weakref_clearing(self):
1953 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1954 ref = weakref.ref(brw)
1955 brw = None
1956 ref = None # Shouldn't segfault.
1957
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001958class CBufferedRWPairTest(BufferedRWPairTest):
1959 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001960
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001961class PyBufferedRWPairTest(BufferedRWPairTest):
1962 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001963
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001964
1965class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1966 read_mode = "rb+"
1967 write_mode = "wb+"
1968
1969 def test_constructor(self):
1970 BufferedReaderTest.test_constructor(self)
1971 BufferedWriterTest.test_constructor(self)
1972
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001973 def test_uninitialized(self):
1974 BufferedReaderTest.test_uninitialized(self)
1975 BufferedWriterTest.test_uninitialized(self)
1976
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001977 def test_read_and_write(self):
1978 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001979 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001980
1981 self.assertEqual(b"as", rw.read(2))
1982 rw.write(b"ddd")
1983 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001984 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001985 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001986 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001987
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001988 def test_seek_and_tell(self):
1989 raw = self.BytesIO(b"asdfghjkl")
1990 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001991
Ezio Melottib3aedd42010-11-20 19:04:17 +00001992 self.assertEqual(b"as", rw.read(2))
1993 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001994 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001995 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001996
Antoine Pitroue05565e2011-08-20 14:39:23 +02001997 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001998 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001999 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002000 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002001 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002002 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002003 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002004 self.assertEqual(7, rw.tell())
2005 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02002006 rw.flush()
2007 self.assertEqual(b"asdf123fl", raw.getvalue())
2008
Christian Heimes8e42a0a2007-11-08 18:04:45 +00002009 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002010
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002011 def check_flush_and_read(self, read_func):
2012 raw = self.BytesIO(b"abcdefghi")
2013 bufio = self.tp(raw)
2014
Ezio Melottib3aedd42010-11-20 19:04:17 +00002015 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002016 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002017 self.assertEqual(b"ef", read_func(bufio, 2))
2018 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002019 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002020 self.assertEqual(6, bufio.tell())
2021 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002022 raw.seek(0, 0)
2023 raw.write(b"XYZ")
2024 # flush() resets the read buffer
2025 bufio.flush()
2026 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002027 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002028
2029 def test_flush_and_read(self):
2030 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2031
2032 def test_flush_and_readinto(self):
2033 def _readinto(bufio, n=-1):
2034 b = bytearray(n if n >= 0 else 9999)
2035 n = bufio.readinto(b)
2036 return bytes(b[:n])
2037 self.check_flush_and_read(_readinto)
2038
2039 def test_flush_and_peek(self):
2040 def _peek(bufio, n=-1):
2041 # This relies on the fact that the buffer can contain the whole
2042 # raw stream, otherwise peek() can return less.
2043 b = bufio.peek(n)
2044 if n != -1:
2045 b = b[:n]
2046 bufio.seek(len(b), 1)
2047 return b
2048 self.check_flush_and_read(_peek)
2049
2050 def test_flush_and_write(self):
2051 raw = self.BytesIO(b"abcdefghi")
2052 bufio = self.tp(raw)
2053
2054 bufio.write(b"123")
2055 bufio.flush()
2056 bufio.write(b"45")
2057 bufio.flush()
2058 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002059 self.assertEqual(b"12345fghi", raw.getvalue())
2060 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002061
2062 def test_threads(self):
2063 BufferedReaderTest.test_threads(self)
2064 BufferedWriterTest.test_threads(self)
2065
2066 def test_writes_and_peek(self):
2067 def _peek(bufio):
2068 bufio.peek(1)
2069 self.check_writes(_peek)
2070 def _peek(bufio):
2071 pos = bufio.tell()
2072 bufio.seek(-1, 1)
2073 bufio.peek(1)
2074 bufio.seek(pos, 0)
2075 self.check_writes(_peek)
2076
2077 def test_writes_and_reads(self):
2078 def _read(bufio):
2079 bufio.seek(-1, 1)
2080 bufio.read(1)
2081 self.check_writes(_read)
2082
2083 def test_writes_and_read1s(self):
2084 def _read1(bufio):
2085 bufio.seek(-1, 1)
2086 bufio.read1(1)
2087 self.check_writes(_read1)
2088
2089 def test_writes_and_readintos(self):
2090 def _read(bufio):
2091 bufio.seek(-1, 1)
2092 bufio.readinto(bytearray(1))
2093 self.check_writes(_read)
2094
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002095 def test_write_after_readahead(self):
2096 # Issue #6629: writing after the buffer was filled by readahead should
2097 # first rewind the raw stream.
2098 for overwrite_size in [1, 5]:
2099 raw = self.BytesIO(b"A" * 10)
2100 bufio = self.tp(raw, 4)
2101 # Trigger readahead
2102 self.assertEqual(bufio.read(1), b"A")
2103 self.assertEqual(bufio.tell(), 1)
2104 # Overwriting should rewind the raw stream if it needs so
2105 bufio.write(b"B" * overwrite_size)
2106 self.assertEqual(bufio.tell(), overwrite_size + 1)
2107 # If the write size was smaller than the buffer size, flush() and
2108 # check that rewind happens.
2109 bufio.flush()
2110 self.assertEqual(bufio.tell(), overwrite_size + 1)
2111 s = raw.getvalue()
2112 self.assertEqual(s,
2113 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2114
Antoine Pitrou7c404892011-05-13 00:13:33 +02002115 def test_write_rewind_write(self):
2116 # Various combinations of reading / writing / seeking backwards / writing again
2117 def mutate(bufio, pos1, pos2):
2118 assert pos2 >= pos1
2119 # Fill the buffer
2120 bufio.seek(pos1)
2121 bufio.read(pos2 - pos1)
2122 bufio.write(b'\x02')
2123 # This writes earlier than the previous write, but still inside
2124 # the buffer.
2125 bufio.seek(pos1)
2126 bufio.write(b'\x01')
2127
2128 b = b"\x80\x81\x82\x83\x84"
2129 for i in range(0, len(b)):
2130 for j in range(i, len(b)):
2131 raw = self.BytesIO(b)
2132 bufio = self.tp(raw, 100)
2133 mutate(bufio, i, j)
2134 bufio.flush()
2135 expected = bytearray(b)
2136 expected[j] = 2
2137 expected[i] = 1
2138 self.assertEqual(raw.getvalue(), expected,
2139 "failed result for i=%d, j=%d" % (i, j))
2140
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002141 def test_truncate_after_read_or_write(self):
2142 raw = self.BytesIO(b"A" * 10)
2143 bufio = self.tp(raw, 100)
2144 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2145 self.assertEqual(bufio.truncate(), 2)
2146 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2147 self.assertEqual(bufio.truncate(), 4)
2148
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002149 def test_misbehaved_io(self):
2150 BufferedReaderTest.test_misbehaved_io(self)
2151 BufferedWriterTest.test_misbehaved_io(self)
2152
Antoine Pitroue05565e2011-08-20 14:39:23 +02002153 def test_interleaved_read_write(self):
2154 # Test for issue #12213
2155 with self.BytesIO(b'abcdefgh') as raw:
2156 with self.tp(raw, 100) as f:
2157 f.write(b"1")
2158 self.assertEqual(f.read(1), b'b')
2159 f.write(b'2')
2160 self.assertEqual(f.read1(1), b'd')
2161 f.write(b'3')
2162 buf = bytearray(1)
2163 f.readinto(buf)
2164 self.assertEqual(buf, b'f')
2165 f.write(b'4')
2166 self.assertEqual(f.peek(1), b'h')
2167 f.flush()
2168 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2169
2170 with self.BytesIO(b'abc') as raw:
2171 with self.tp(raw, 100) as f:
2172 self.assertEqual(f.read(1), b'a')
2173 f.write(b"2")
2174 self.assertEqual(f.read(1), b'c')
2175 f.flush()
2176 self.assertEqual(raw.getvalue(), b'a2c')
2177
2178 def test_interleaved_readline_write(self):
2179 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2180 with self.tp(raw) as f:
2181 f.write(b'1')
2182 self.assertEqual(f.readline(), b'b\n')
2183 f.write(b'2')
2184 self.assertEqual(f.readline(), b'def\n')
2185 f.write(b'3')
2186 self.assertEqual(f.readline(), b'\n')
2187 f.flush()
2188 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2189
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002190 # You can't construct a BufferedRandom over a non-seekable stream.
2191 test_unseekable = None
2192
R David Murray67bfe802013-02-23 21:51:05 -05002193
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002194class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002195 tp = io.BufferedRandom
2196
2197 def test_constructor(self):
2198 BufferedRandomTest.test_constructor(self)
2199 # The allocation can succeed on 32-bit builds, e.g. with more
2200 # than 2GB RAM and a 64-bit kernel.
2201 if sys.maxsize > 0x7FFFFFFF:
2202 rawio = self.MockRawIO()
2203 bufio = self.tp(rawio)
2204 self.assertRaises((OverflowError, MemoryError, ValueError),
2205 bufio.__init__, rawio, sys.maxsize)
2206
2207 def test_garbage_collection(self):
2208 CBufferedReaderTest.test_garbage_collection(self)
2209 CBufferedWriterTest.test_garbage_collection(self)
2210
R David Murray67bfe802013-02-23 21:51:05 -05002211 def test_args_error(self):
2212 # Issue #17275
2213 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2214 self.tp(io.BytesIO(), 1024, 1024, 1024)
2215
2216
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002217class PyBufferedRandomTest(BufferedRandomTest):
2218 tp = pyio.BufferedRandom
2219
2220
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002221# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2222# properties:
2223# - A single output character can correspond to many bytes of input.
2224# - The number of input bytes to complete the character can be
2225# undetermined until the last input byte is received.
2226# - The number of input bytes can vary depending on previous input.
2227# - A single input byte can correspond to many characters of output.
2228# - The number of output characters can be undetermined until the
2229# last input byte is received.
2230# - The number of output characters can vary depending on previous input.
2231
2232class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2233 """
2234 For testing seek/tell behavior with a stateful, buffering decoder.
2235
2236 Input is a sequence of words. Words may be fixed-length (length set
2237 by input) or variable-length (period-terminated). In variable-length
2238 mode, extra periods are ignored. Possible words are:
2239 - 'i' followed by a number sets the input length, I (maximum 99).
2240 When I is set to 0, words are space-terminated.
2241 - 'o' followed by a number sets the output length, O (maximum 99).
2242 - Any other word is converted into a word followed by a period on
2243 the output. The output word consists of the input word truncated
2244 or padded out with hyphens to make its length equal to O. If O
2245 is 0, the word is output verbatim without truncating or padding.
2246 I and O are initially set to 1. When I changes, any buffered input is
2247 re-scanned according to the new I. EOF also terminates the last word.
2248 """
2249
2250 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002251 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002252 self.reset()
2253
2254 def __repr__(self):
2255 return '<SID %x>' % id(self)
2256
2257 def reset(self):
2258 self.i = 1
2259 self.o = 1
2260 self.buffer = bytearray()
2261
2262 def getstate(self):
2263 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2264 return bytes(self.buffer), i*100 + o
2265
2266 def setstate(self, state):
2267 buffer, io = state
2268 self.buffer = bytearray(buffer)
2269 i, o = divmod(io, 100)
2270 self.i, self.o = i ^ 1, o ^ 1
2271
2272 def decode(self, input, final=False):
2273 output = ''
2274 for b in input:
2275 if self.i == 0: # variable-length, terminated with period
2276 if b == ord('.'):
2277 if self.buffer:
2278 output += self.process_word()
2279 else:
2280 self.buffer.append(b)
2281 else: # fixed-length, terminate after self.i bytes
2282 self.buffer.append(b)
2283 if len(self.buffer) == self.i:
2284 output += self.process_word()
2285 if final and self.buffer: # EOF terminates the last word
2286 output += self.process_word()
2287 return output
2288
2289 def process_word(self):
2290 output = ''
2291 if self.buffer[0] == ord('i'):
2292 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2293 elif self.buffer[0] == ord('o'):
2294 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2295 else:
2296 output = self.buffer.decode('ascii')
2297 if len(output) < self.o:
2298 output += '-'*self.o # pad out with hyphens
2299 if self.o:
2300 output = output[:self.o] # truncate to output length
2301 output += '.'
2302 self.buffer = bytearray()
2303 return output
2304
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002305 codecEnabled = False
2306
2307 @classmethod
2308 def lookupTestDecoder(cls, name):
2309 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002310 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002311 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002312 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002313 incrementalencoder=None,
2314 streamreader=None, streamwriter=None,
2315 incrementaldecoder=cls)
2316
2317# Register the previous decoder for testing.
2318# Disabled by default, tests will enable it.
2319codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2320
2321
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002322class StatefulIncrementalDecoderTest(unittest.TestCase):
2323 """
2324 Make sure the StatefulIncrementalDecoder actually works.
2325 """
2326
2327 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002328 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002329 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002330 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002331 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002332 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002333 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002334 # I=0, O=6 (variable-length input, fixed-length output)
2335 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2336 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002337 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002338 # I=6, O=3 (fixed-length input > fixed-length output)
2339 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2340 # I=0, then 3; O=29, then 15 (with longer output)
2341 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2342 'a----------------------------.' +
2343 'b----------------------------.' +
2344 'cde--------------------------.' +
2345 'abcdefghijabcde.' +
2346 'a.b------------.' +
2347 '.c.------------.' +
2348 'd.e------------.' +
2349 'k--------------.' +
2350 'l--------------.' +
2351 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002352 ]
2353
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002354 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002355 # Try a few one-shot test cases.
2356 for input, eof, output in self.test_cases:
2357 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002358 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002359
2360 # Also test an unfinished decode, followed by forcing EOF.
2361 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002362 self.assertEqual(d.decode(b'oiabcd'), '')
2363 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002364
2365class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002366
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002367 def setUp(self):
2368 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2369 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002370 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002371
Guido van Rossumd0712812007-04-11 16:32:43 +00002372 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002373 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002374
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002375 def test_constructor(self):
2376 r = self.BytesIO(b"\xc3\xa9\n\n")
2377 b = self.BufferedReader(r, 1000)
2378 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002379 t.__init__(b, encoding="latin-1", newline="\r\n")
2380 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002381 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002382 t.__init__(b, encoding="utf-8", line_buffering=True)
2383 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002384 self.assertEqual(t.line_buffering, True)
2385 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002386 self.assertRaises(TypeError, t.__init__, b, newline=42)
2387 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2388
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002389 def test_uninitialized(self):
2390 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2391 del t
2392 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2393 self.assertRaises(Exception, repr, t)
2394 self.assertRaisesRegex((ValueError, AttributeError),
2395 'uninitialized|has no attribute',
2396 t.read, 0)
2397 t.__init__(self.MockRawIO())
2398 self.assertEqual(t.read(0), '')
2399
Nick Coghlana9b15242014-02-04 22:11:18 +10002400 def test_non_text_encoding_codecs_are_rejected(self):
2401 # Ensure the constructor complains if passed a codec that isn't
2402 # marked as a text encoding
2403 # http://bugs.python.org/issue20404
2404 r = self.BytesIO()
2405 b = self.BufferedWriter(r)
2406 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2407 self.TextIOWrapper(b, encoding="hex")
2408
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002409 def test_detach(self):
2410 r = self.BytesIO()
2411 b = self.BufferedWriter(r)
2412 t = self.TextIOWrapper(b)
2413 self.assertIs(t.detach(), b)
2414
2415 t = self.TextIOWrapper(b, encoding="ascii")
2416 t.write("howdy")
2417 self.assertFalse(r.getvalue())
2418 t.detach()
2419 self.assertEqual(r.getvalue(), b"howdy")
2420 self.assertRaises(ValueError, t.detach)
2421
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002422 # Operations independent of the detached stream should still work
2423 repr(t)
2424 self.assertEqual(t.encoding, "ascii")
2425 self.assertEqual(t.errors, "strict")
2426 self.assertFalse(t.line_buffering)
2427
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002428 def test_repr(self):
2429 raw = self.BytesIO("hello".encode("utf-8"))
2430 b = self.BufferedReader(raw)
2431 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002432 modname = self.TextIOWrapper.__module__
2433 self.assertEqual(repr(t),
2434 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2435 raw.name = "dummy"
2436 self.assertEqual(repr(t),
2437 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002438 t.mode = "r"
2439 self.assertEqual(repr(t),
2440 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002441 raw.name = b"dummy"
2442 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002443 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002444
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002445 t.buffer.detach()
2446 repr(t) # Should not raise an exception
2447
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002448 def test_recursive_repr(self):
2449 # Issue #25455
2450 raw = self.BytesIO()
2451 t = self.TextIOWrapper(raw)
2452 with support.swap_attr(raw, 'name', t):
2453 try:
2454 repr(t) # Should not crash
2455 except RuntimeError:
2456 pass
2457
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002458 def test_line_buffering(self):
2459 r = self.BytesIO()
2460 b = self.BufferedWriter(r, 1000)
2461 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002462 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002463 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002464 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002465 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002466 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002467 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002468
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002469 def test_default_encoding(self):
2470 old_environ = dict(os.environ)
2471 try:
2472 # try to get a user preferred encoding different than the current
2473 # locale encoding to check that TextIOWrapper() uses the current
2474 # locale encoding and not the user preferred encoding
2475 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2476 if key in os.environ:
2477 del os.environ[key]
2478
2479 current_locale_encoding = locale.getpreferredencoding(False)
2480 b = self.BytesIO()
2481 t = self.TextIOWrapper(b)
2482 self.assertEqual(t.encoding, current_locale_encoding)
2483 finally:
2484 os.environ.clear()
2485 os.environ.update(old_environ)
2486
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002487 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002488 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002489 # Issue 15989
2490 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002491 b = self.BytesIO()
2492 b.fileno = lambda: _testcapi.INT_MAX + 1
2493 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2494 b.fileno = lambda: _testcapi.UINT_MAX + 1
2495 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2496
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002497 def test_encoding(self):
2498 # Check the encoding attribute is always set, and valid
2499 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002500 t = self.TextIOWrapper(b, encoding="utf-8")
2501 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002502 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002503 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002504 codecs.lookup(t.encoding)
2505
2506 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002507 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002508 b = self.BytesIO(b"abc\n\xff\n")
2509 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002510 self.assertRaises(UnicodeError, t.read)
2511 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002512 b = self.BytesIO(b"abc\n\xff\n")
2513 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002514 self.assertRaises(UnicodeError, t.read)
2515 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002516 b = self.BytesIO(b"abc\n\xff\n")
2517 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002518 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002519 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002520 b = self.BytesIO(b"abc\n\xff\n")
2521 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002522 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002523
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002524 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002525 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002526 b = self.BytesIO()
2527 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002528 self.assertRaises(UnicodeError, t.write, "\xff")
2529 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002530 b = self.BytesIO()
2531 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002532 self.assertRaises(UnicodeError, t.write, "\xff")
2533 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002534 b = self.BytesIO()
2535 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002536 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002537 t.write("abc\xffdef\n")
2538 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002539 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002540 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002541 b = self.BytesIO()
2542 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002543 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002544 t.write("abc\xffdef\n")
2545 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002546 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002547
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002548 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002549 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2550
2551 tests = [
2552 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002553 [ '', input_lines ],
2554 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2555 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2556 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002557 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002558 encodings = (
2559 'utf-8', 'latin-1',
2560 'utf-16', 'utf-16-le', 'utf-16-be',
2561 'utf-32', 'utf-32-le', 'utf-32-be',
2562 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002563
Guido van Rossum8358db22007-08-18 21:39:55 +00002564 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002565 # character in TextIOWrapper._pending_line.
2566 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002567 # XXX: str.encode() should return bytes
2568 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002569 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002570 for bufsize in range(1, 10):
2571 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002572 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2573 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002574 encoding=encoding)
2575 if do_reads:
2576 got_lines = []
2577 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002578 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002579 if c2 == '':
2580 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002581 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002582 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002583 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002584 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002585
2586 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002587 self.assertEqual(got_line, exp_line)
2588 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002589
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002590 def test_newlines_input(self):
2591 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002592 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2593 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002594 (None, normalized.decode("ascii").splitlines(keepends=True)),
2595 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002596 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2597 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2598 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002599 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002600 buf = self.BytesIO(testdata)
2601 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002602 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002603 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002604 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002605
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002606 def test_newlines_output(self):
2607 testdict = {
2608 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2609 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2610 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2611 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2612 }
2613 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2614 for newline, expected in tests:
2615 buf = self.BytesIO()
2616 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2617 txt.write("AAA\nB")
2618 txt.write("BB\nCCC\n")
2619 txt.write("X\rY\r\nZ")
2620 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002621 self.assertEqual(buf.closed, False)
2622 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002623
2624 def test_destructor(self):
2625 l = []
2626 base = self.BytesIO
2627 class MyBytesIO(base):
2628 def close(self):
2629 l.append(self.getvalue())
2630 base.close(self)
2631 b = MyBytesIO()
2632 t = self.TextIOWrapper(b, encoding="ascii")
2633 t.write("abc")
2634 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002635 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002636 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002637
2638 def test_override_destructor(self):
2639 record = []
2640 class MyTextIO(self.TextIOWrapper):
2641 def __del__(self):
2642 record.append(1)
2643 try:
2644 f = super().__del__
2645 except AttributeError:
2646 pass
2647 else:
2648 f()
2649 def close(self):
2650 record.append(2)
2651 super().close()
2652 def flush(self):
2653 record.append(3)
2654 super().flush()
2655 b = self.BytesIO()
2656 t = MyTextIO(b, encoding="ascii")
2657 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002658 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002659 self.assertEqual(record, [1, 2, 3])
2660
2661 def test_error_through_destructor(self):
2662 # Test that the exception state is not modified by a destructor,
2663 # even if close() fails.
2664 rawio = self.CloseFailureIO()
2665 def f():
2666 self.TextIOWrapper(rawio).xyzzy
2667 with support.captured_output("stderr") as s:
2668 self.assertRaises(AttributeError, f)
2669 s = s.getvalue().strip()
2670 if s:
2671 # The destructor *may* have printed an unraisable error, check it
2672 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002673 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002674 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002675
Guido van Rossum9b76da62007-04-11 01:09:03 +00002676 # Systematic tests of the text I/O API
2677
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002678 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002679 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002680 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002681 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002682 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002683 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002684 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002685 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002686 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002687 self.assertEqual(f.tell(), 0)
2688 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002689 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002690 self.assertEqual(f.seek(0), 0)
2691 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002692 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002693 self.assertEqual(f.read(2), "ab")
2694 self.assertEqual(f.read(1), "c")
2695 self.assertEqual(f.read(1), "")
2696 self.assertEqual(f.read(), "")
2697 self.assertEqual(f.tell(), cookie)
2698 self.assertEqual(f.seek(0), 0)
2699 self.assertEqual(f.seek(0, 2), cookie)
2700 self.assertEqual(f.write("def"), 3)
2701 self.assertEqual(f.seek(cookie), cookie)
2702 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002703 if enc.startswith("utf"):
2704 self.multi_line_test(f, enc)
2705 f.close()
2706
2707 def multi_line_test(self, f, enc):
2708 f.seek(0)
2709 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002710 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002711 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002712 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002713 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002714 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002715 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002716 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002717 wlines.append((f.tell(), line))
2718 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002719 f.seek(0)
2720 rlines = []
2721 while True:
2722 pos = f.tell()
2723 line = f.readline()
2724 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002725 break
2726 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002727 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002728
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002729 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002730 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002731 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002732 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002733 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002734 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002735 p2 = f.tell()
2736 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002737 self.assertEqual(f.tell(), p0)
2738 self.assertEqual(f.readline(), "\xff\n")
2739 self.assertEqual(f.tell(), p1)
2740 self.assertEqual(f.readline(), "\xff\n")
2741 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002742 f.seek(0)
2743 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002744 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002745 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002746 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002747 f.close()
2748
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002749 def test_seeking(self):
2750 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002751 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002752 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002753 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002754 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002755 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002756 suffix = bytes(u_suffix.encode("utf-8"))
2757 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002758 with self.open(support.TESTFN, "wb") as f:
2759 f.write(line*2)
2760 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2761 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002762 self.assertEqual(s, str(prefix, "ascii"))
2763 self.assertEqual(f.tell(), prefix_size)
2764 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002765
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002766 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002767 # Regression test for a specific bug
2768 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002769 with self.open(support.TESTFN, "wb") as f:
2770 f.write(data)
2771 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2772 f._CHUNK_SIZE # Just test that it exists
2773 f._CHUNK_SIZE = 2
2774 f.readline()
2775 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002776
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002777 def test_seek_and_tell(self):
2778 #Test seek/tell using the StatefulIncrementalDecoder.
2779 # Make test faster by doing smaller seeks
2780 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002781
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002782 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002783 """Tell/seek to various points within a data stream and ensure
2784 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002785 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002786 f.write(data)
2787 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002788 f = self.open(support.TESTFN, encoding='test_decoder')
2789 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002790 decoded = f.read()
2791 f.close()
2792
Neal Norwitze2b07052008-03-18 19:52:05 +00002793 for i in range(min_pos, len(decoded) + 1): # seek positions
2794 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002795 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002796 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002797 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002798 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002799 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002800 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002801 f.close()
2802
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002803 # Enable the test decoder.
2804 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002805
2806 # Run the tests.
2807 try:
2808 # Try each test case.
2809 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002810 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002811
2812 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002813 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2814 offset = CHUNK_SIZE - len(input)//2
2815 prefix = b'.'*offset
2816 # Don't bother seeking into the prefix (takes too long).
2817 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002818 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002819
2820 # Ensure our test decoder won't interfere with subsequent tests.
2821 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002822 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002823
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002824 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002825 data = "1234567890"
2826 tests = ("utf-16",
2827 "utf-16-le",
2828 "utf-16-be",
2829 "utf-32",
2830 "utf-32-le",
2831 "utf-32-be")
2832 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002833 buf = self.BytesIO()
2834 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002835 # Check if the BOM is written only once (see issue1753).
2836 f.write(data)
2837 f.write(data)
2838 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002839 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002840 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002841 self.assertEqual(f.read(), data * 2)
2842 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002843
Benjamin Petersona1b49012009-03-31 23:11:32 +00002844 def test_unreadable(self):
2845 class UnReadable(self.BytesIO):
2846 def readable(self):
2847 return False
2848 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002849 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002850
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002851 def test_read_one_by_one(self):
2852 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002853 reads = ""
2854 while True:
2855 c = txt.read(1)
2856 if not c:
2857 break
2858 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002859 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002860
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002861 def test_readlines(self):
2862 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2863 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2864 txt.seek(0)
2865 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2866 txt.seek(0)
2867 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2868
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002869 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002870 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002871 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002872 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002873 reads = ""
2874 while True:
2875 c = txt.read(128)
2876 if not c:
2877 break
2878 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002879 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002880
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002881 def test_writelines(self):
2882 l = ['ab', 'cd', 'ef']
2883 buf = self.BytesIO()
2884 txt = self.TextIOWrapper(buf)
2885 txt.writelines(l)
2886 txt.flush()
2887 self.assertEqual(buf.getvalue(), b'abcdef')
2888
2889 def test_writelines_userlist(self):
2890 l = UserList(['ab', 'cd', 'ef'])
2891 buf = self.BytesIO()
2892 txt = self.TextIOWrapper(buf)
2893 txt.writelines(l)
2894 txt.flush()
2895 self.assertEqual(buf.getvalue(), b'abcdef')
2896
2897 def test_writelines_error(self):
2898 txt = self.TextIOWrapper(self.BytesIO())
2899 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2900 self.assertRaises(TypeError, txt.writelines, None)
2901 self.assertRaises(TypeError, txt.writelines, b'abc')
2902
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002903 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002904 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002905
2906 # read one char at a time
2907 reads = ""
2908 while True:
2909 c = txt.read(1)
2910 if not c:
2911 break
2912 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002913 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002914
2915 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002916 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002917 txt._CHUNK_SIZE = 4
2918
2919 reads = ""
2920 while True:
2921 c = txt.read(4)
2922 if not c:
2923 break
2924 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002925 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002926
2927 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002928 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002929 txt._CHUNK_SIZE = 4
2930
2931 reads = txt.read(4)
2932 reads += txt.read(4)
2933 reads += txt.readline()
2934 reads += txt.readline()
2935 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002936 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002937
2938 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002939 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002940 txt._CHUNK_SIZE = 4
2941
2942 reads = txt.read(4)
2943 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002944 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002945
2946 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002947 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002948 txt._CHUNK_SIZE = 4
2949
2950 reads = txt.read(4)
2951 pos = txt.tell()
2952 txt.seek(0)
2953 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002954 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002955
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002956 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002957 buffer = self.BytesIO(self.testdata)
2958 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002959
2960 self.assertEqual(buffer.seekable(), txt.seekable())
2961
Antoine Pitroue4501852009-05-14 18:55:55 +00002962 def test_append_bom(self):
2963 # The BOM is not written again when appending to a non-empty file
2964 filename = support.TESTFN
2965 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2966 with self.open(filename, 'w', encoding=charset) as f:
2967 f.write('aaa')
2968 pos = f.tell()
2969 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002970 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002971
2972 with self.open(filename, 'a', encoding=charset) as f:
2973 f.write('xxx')
2974 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002975 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002976
2977 def test_seek_bom(self):
2978 # Same test, but when seeking manually
2979 filename = support.TESTFN
2980 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2981 with self.open(filename, 'w', encoding=charset) as f:
2982 f.write('aaa')
2983 pos = f.tell()
2984 with self.open(filename, 'r+', encoding=charset) as f:
2985 f.seek(pos)
2986 f.write('zzz')
2987 f.seek(0)
2988 f.write('bbb')
2989 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002990 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002991
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02002992 def test_seek_append_bom(self):
2993 # Same test, but first seek to the start and then to the end
2994 filename = support.TESTFN
2995 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2996 with self.open(filename, 'w', encoding=charset) as f:
2997 f.write('aaa')
2998 with self.open(filename, 'a', encoding=charset) as f:
2999 f.seek(0)
3000 f.seek(0, self.SEEK_END)
3001 f.write('xxx')
3002 with self.open(filename, 'rb') as f:
3003 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3004
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003005 def test_errors_property(self):
3006 with self.open(support.TESTFN, "w") as f:
3007 self.assertEqual(f.errors, "strict")
3008 with self.open(support.TESTFN, "w", errors="replace") as f:
3009 self.assertEqual(f.errors, "replace")
3010
Brett Cannon31f59292011-02-21 19:29:56 +00003011 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00003012 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003013 def test_threads_write(self):
3014 # Issue6750: concurrent writes could duplicate data
3015 event = threading.Event()
3016 with self.open(support.TESTFN, "w", buffering=1) as f:
3017 def run(n):
3018 text = "Thread%03d\n" % n
3019 event.wait()
3020 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003021 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003022 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003023 with support.start_threads(threads, event.set):
3024 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003025 with self.open(support.TESTFN) as f:
3026 content = f.read()
3027 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003028 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003029
Antoine Pitrou6be88762010-05-03 16:48:20 +00003030 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003031 # Test that text file is closed despite failed flush
3032 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003033 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003034 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003035 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003036 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003037 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003038 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003039 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003040 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003041 self.assertTrue(txt.buffer.closed)
3042 self.assertTrue(closed) # flush() called
3043 self.assertFalse(closed[0]) # flush() called before file closed
3044 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003045 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003046
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003047 def test_close_error_on_close(self):
3048 buffer = self.BytesIO(self.testdata)
3049 def bad_flush():
3050 raise OSError('flush')
3051 def bad_close():
3052 raise OSError('close')
3053 buffer.close = bad_close
3054 txt = self.TextIOWrapper(buffer, encoding="ascii")
3055 txt.flush = bad_flush
3056 with self.assertRaises(OSError) as err: # exception not swallowed
3057 txt.close()
3058 self.assertEqual(err.exception.args, ('close',))
3059 self.assertIsInstance(err.exception.__context__, OSError)
3060 self.assertEqual(err.exception.__context__.args, ('flush',))
3061 self.assertFalse(txt.closed)
3062
3063 def test_nonnormalized_close_error_on_close(self):
3064 # Issue #21677
3065 buffer = self.BytesIO(self.testdata)
3066 def bad_flush():
3067 raise non_existing_flush
3068 def bad_close():
3069 raise non_existing_close
3070 buffer.close = bad_close
3071 txt = self.TextIOWrapper(buffer, encoding="ascii")
3072 txt.flush = bad_flush
3073 with self.assertRaises(NameError) as err: # exception not swallowed
3074 txt.close()
3075 self.assertIn('non_existing_close', str(err.exception))
3076 self.assertIsInstance(err.exception.__context__, NameError)
3077 self.assertIn('non_existing_flush', str(err.exception.__context__))
3078 self.assertFalse(txt.closed)
3079
Antoine Pitrou6be88762010-05-03 16:48:20 +00003080 def test_multi_close(self):
3081 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3082 txt.close()
3083 txt.close()
3084 txt.close()
3085 self.assertRaises(ValueError, txt.flush)
3086
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003087 def test_unseekable(self):
3088 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3089 self.assertRaises(self.UnsupportedOperation, txt.tell)
3090 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3091
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003092 def test_readonly_attributes(self):
3093 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3094 buf = self.BytesIO(self.testdata)
3095 with self.assertRaises(AttributeError):
3096 txt.buffer = buf
3097
Antoine Pitroue96ec682011-07-23 21:46:35 +02003098 def test_rawio(self):
3099 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3100 # that subprocess.Popen() can have the required unbuffered
3101 # semantics with universal_newlines=True.
3102 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3103 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3104 # Reads
3105 self.assertEqual(txt.read(4), 'abcd')
3106 self.assertEqual(txt.readline(), 'efghi\n')
3107 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3108
3109 def test_rawio_write_through(self):
3110 # Issue #12591: with write_through=True, writes don't need a flush
3111 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3112 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3113 write_through=True)
3114 txt.write('1')
3115 txt.write('23\n4')
3116 txt.write('5')
3117 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3118
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003119 def test_bufio_write_through(self):
3120 # Issue #21396: write_through=True doesn't force a flush()
3121 # on the underlying binary buffered object.
3122 flush_called, write_called = [], []
3123 class BufferedWriter(self.BufferedWriter):
3124 def flush(self, *args, **kwargs):
3125 flush_called.append(True)
3126 return super().flush(*args, **kwargs)
3127 def write(self, *args, **kwargs):
3128 write_called.append(True)
3129 return super().write(*args, **kwargs)
3130
3131 rawio = self.BytesIO()
3132 data = b"a"
3133 bufio = BufferedWriter(rawio, len(data)*2)
3134 textio = self.TextIOWrapper(bufio, encoding='ascii',
3135 write_through=True)
3136 # write to the buffered io but don't overflow the buffer
3137 text = data.decode('ascii')
3138 textio.write(text)
3139
3140 # buffer.flush is not called with write_through=True
3141 self.assertFalse(flush_called)
3142 # buffer.write *is* called with write_through=True
3143 self.assertTrue(write_called)
3144 self.assertEqual(rawio.getvalue(), b"") # no flush
3145
3146 write_called = [] # reset
3147 textio.write(text * 10) # total content is larger than bufio buffer
3148 self.assertTrue(write_called)
3149 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3150
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003151 def test_read_nonbytes(self):
3152 # Issue #17106
3153 # Crash when underlying read() returns non-bytes
3154 t = self.TextIOWrapper(self.StringIO('a'))
3155 self.assertRaises(TypeError, t.read, 1)
3156 t = self.TextIOWrapper(self.StringIO('a'))
3157 self.assertRaises(TypeError, t.readline)
3158 t = self.TextIOWrapper(self.StringIO('a'))
3159 self.assertRaises(TypeError, t.read)
3160
3161 def test_illegal_decoder(self):
3162 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003163 # Bypass the early encoding check added in issue 20404
3164 def _make_illegal_wrapper():
3165 quopri = codecs.lookup("quopri")
3166 quopri._is_text_encoding = True
3167 try:
3168 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3169 newline='\n', encoding="quopri")
3170 finally:
3171 quopri._is_text_encoding = False
3172 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003173 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003174 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003175 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003176 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003177 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003178 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003179 self.assertRaises(TypeError, t.read)
3180
Antoine Pitrou712cb732013-12-21 15:51:54 +01003181 def _check_create_at_shutdown(self, **kwargs):
3182 # Issue #20037: creating a TextIOWrapper at shutdown
3183 # shouldn't crash the interpreter.
3184 iomod = self.io.__name__
3185 code = """if 1:
3186 import codecs
3187 import {iomod} as io
3188
3189 # Avoid looking up codecs at shutdown
3190 codecs.lookup('utf-8')
3191
3192 class C:
3193 def __init__(self):
3194 self.buf = io.BytesIO()
3195 def __del__(self):
3196 io.TextIOWrapper(self.buf, **{kwargs})
3197 print("ok")
3198 c = C()
3199 """.format(iomod=iomod, kwargs=kwargs)
3200 return assert_python_ok("-c", code)
3201
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003202 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003203 def test_create_at_shutdown_without_encoding(self):
3204 rc, out, err = self._check_create_at_shutdown()
3205 if err:
3206 # Can error out with a RuntimeError if the module state
3207 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003208 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003209 else:
3210 self.assertEqual("ok", out.decode().strip())
3211
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003212 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003213 def test_create_at_shutdown_with_encoding(self):
3214 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3215 errors='strict')
3216 self.assertFalse(err)
3217 self.assertEqual("ok", out.decode().strip())
3218
Antoine Pitroub8503892014-04-29 10:14:02 +02003219 def test_read_byteslike(self):
3220 r = MemviewBytesIO(b'Just some random string\n')
3221 t = self.TextIOWrapper(r, 'utf-8')
3222
3223 # TextIOwrapper will not read the full string, because
3224 # we truncate it to a multiple of the native int size
3225 # so that we can construct a more complex memoryview.
3226 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3227
3228 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3229
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003230 def test_issue22849(self):
3231 class F(object):
3232 def readable(self): return True
3233 def writable(self): return True
3234 def seekable(self): return True
3235
3236 for i in range(10):
3237 try:
3238 self.TextIOWrapper(F(), encoding='utf-8')
3239 except Exception:
3240 pass
3241
3242 F.tell = lambda x: 0
3243 t = self.TextIOWrapper(F(), encoding='utf-8')
3244
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003245
Antoine Pitroub8503892014-04-29 10:14:02 +02003246class MemviewBytesIO(io.BytesIO):
3247 '''A BytesIO object whose read method returns memoryviews
3248 rather than bytes'''
3249
3250 def read1(self, len_):
3251 return _to_memoryview(super().read1(len_))
3252
3253 def read(self, len_):
3254 return _to_memoryview(super().read(len_))
3255
3256def _to_memoryview(buf):
3257 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3258
3259 arr = array.array('i')
3260 idx = len(buf) - len(buf) % arr.itemsize
3261 arr.frombytes(buf[:idx])
3262 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003263
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003264
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003265class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003266 io = io
Steve Dowerfe8f4c92016-09-07 09:31:52 -07003267 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003268
3269 def test_initialization(self):
3270 r = self.BytesIO(b"\xc3\xa9\n\n")
3271 b = self.BufferedReader(r, 1000)
3272 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003273 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3274 self.assertRaises(ValueError, t.read)
3275
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003276 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3277 self.assertRaises(Exception, repr, t)
3278
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003279 def test_garbage_collection(self):
3280 # C TextIOWrapper objects are collected, and collecting them flushes
3281 # all data to disk.
3282 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003283 with support.check_warnings(('', ResourceWarning)):
3284 rawio = io.FileIO(support.TESTFN, "wb")
3285 b = self.BufferedWriter(rawio)
3286 t = self.TextIOWrapper(b, encoding="ascii")
3287 t.write("456def")
3288 t.x = t
3289 wr = weakref.ref(t)
3290 del t
3291 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003292 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003293 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003294 self.assertEqual(f.read(), b"456def")
3295
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003296 def test_rwpair_cleared_before_textio(self):
3297 # Issue 13070: TextIOWrapper's finalization would crash when called
3298 # after the reference to the underlying BufferedRWPair's writer got
3299 # cleared by the GC.
3300 for i in range(1000):
3301 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3302 t1 = self.TextIOWrapper(b1, encoding="ascii")
3303 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3304 t2 = self.TextIOWrapper(b2, encoding="ascii")
3305 # circular references
3306 t1.buddy = t2
3307 t2.buddy = t1
3308 support.gc_collect()
3309
3310
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003311class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003312 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003313 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003314
3315
3316class IncrementalNewlineDecoderTest(unittest.TestCase):
3317
3318 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003319 # UTF-8 specific tests for a newline decoder
3320 def _check_decode(b, s, **kwargs):
3321 # We exercise getstate() / setstate() as well as decode()
3322 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003323 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003324 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003325 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003326
Antoine Pitrou180a3362008-12-14 16:36:46 +00003327 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003328
Antoine Pitrou180a3362008-12-14 16:36:46 +00003329 _check_decode(b'\xe8', "")
3330 _check_decode(b'\xa2', "")
3331 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003332
Antoine Pitrou180a3362008-12-14 16:36:46 +00003333 _check_decode(b'\xe8', "")
3334 _check_decode(b'\xa2', "")
3335 _check_decode(b'\x88', "\u8888")
3336
3337 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003338 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3339
Antoine Pitrou180a3362008-12-14 16:36:46 +00003340 decoder.reset()
3341 _check_decode(b'\n', "\n")
3342 _check_decode(b'\r', "")
3343 _check_decode(b'', "\n", final=True)
3344 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003345
Antoine Pitrou180a3362008-12-14 16:36:46 +00003346 _check_decode(b'\r', "")
3347 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003348
Antoine Pitrou180a3362008-12-14 16:36:46 +00003349 _check_decode(b'\r\r\n', "\n\n")
3350 _check_decode(b'\r', "")
3351 _check_decode(b'\r', "\n")
3352 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003353
Antoine Pitrou180a3362008-12-14 16:36:46 +00003354 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3355 _check_decode(b'\xe8\xa2\x88', "\u8888")
3356 _check_decode(b'\n', "\n")
3357 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3358 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003359
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003360 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003361 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003362 if encoding is not None:
3363 encoder = codecs.getincrementalencoder(encoding)()
3364 def _decode_bytewise(s):
3365 # Decode one byte at a time
3366 for b in encoder.encode(s):
3367 result.append(decoder.decode(bytes([b])))
3368 else:
3369 encoder = None
3370 def _decode_bytewise(s):
3371 # Decode one char at a time
3372 for c in s:
3373 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003374 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003375 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003376 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003377 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003378 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003379 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003380 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003381 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003382 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003383 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003384 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003385 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003386 input = "abc"
3387 if encoder is not None:
3388 encoder.reset()
3389 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003390 self.assertEqual(decoder.decode(input), "abc")
3391 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003392
3393 def test_newline_decoder(self):
3394 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003395 # None meaning the IncrementalNewlineDecoder takes unicode input
3396 # rather than bytes input
3397 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003398 'utf-16', 'utf-16-le', 'utf-16-be',
3399 'utf-32', 'utf-32-le', 'utf-32-be',
3400 )
3401 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003402 decoder = enc and codecs.getincrementaldecoder(enc)()
3403 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3404 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003405 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003406 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3407 self.check_newline_decoding_utf8(decoder)
3408
Antoine Pitrou66913e22009-03-06 23:40:56 +00003409 def test_newline_bytes(self):
3410 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3411 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003412 self.assertEqual(dec.newlines, None)
3413 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3414 self.assertEqual(dec.newlines, None)
3415 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3416 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003417 dec = self.IncrementalNewlineDecoder(None, translate=False)
3418 _check(dec)
3419 dec = self.IncrementalNewlineDecoder(None, translate=True)
3420 _check(dec)
3421
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003422class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3423 pass
3424
3425class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3426 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003427
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003428
Guido van Rossum01a27522007-03-07 01:00:12 +00003429# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003430
Guido van Rossum5abbf752007-08-27 17:39:33 +00003431class MiscIOTest(unittest.TestCase):
3432
Barry Warsaw40e82462008-11-20 20:14:50 +00003433 def tearDown(self):
3434 support.unlink(support.TESTFN)
3435
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003436 def test___all__(self):
3437 for name in self.io.__all__:
3438 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003439 self.assertIsNotNone(obj, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003440 if name == "open":
3441 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003442 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003443 self.assertTrue(issubclass(obj, Exception), name)
3444 elif not name.startswith("SEEK_"):
3445 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003446
Barry Warsaw40e82462008-11-20 20:14:50 +00003447 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003448 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003449 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003450 f.close()
3451
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003452 with support.check_warnings(('', DeprecationWarning)):
3453 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003454 self.assertEqual(f.name, support.TESTFN)
3455 self.assertEqual(f.buffer.name, support.TESTFN)
3456 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3457 self.assertEqual(f.mode, "U")
3458 self.assertEqual(f.buffer.mode, "rb")
3459 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003460 f.close()
3461
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003462 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003463 self.assertEqual(f.mode, "w+")
3464 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3465 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003466
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003467 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003468 self.assertEqual(g.mode, "wb")
3469 self.assertEqual(g.raw.mode, "wb")
3470 self.assertEqual(g.name, f.fileno())
3471 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003472 f.close()
3473 g.close()
3474
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003475 def test_io_after_close(self):
3476 for kwargs in [
3477 {"mode": "w"},
3478 {"mode": "wb"},
3479 {"mode": "w", "buffering": 1},
3480 {"mode": "w", "buffering": 2},
3481 {"mode": "wb", "buffering": 0},
3482 {"mode": "r"},
3483 {"mode": "rb"},
3484 {"mode": "r", "buffering": 1},
3485 {"mode": "r", "buffering": 2},
3486 {"mode": "rb", "buffering": 0},
3487 {"mode": "w+"},
3488 {"mode": "w+b"},
3489 {"mode": "w+", "buffering": 1},
3490 {"mode": "w+", "buffering": 2},
3491 {"mode": "w+b", "buffering": 0},
3492 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003493 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003494 f.close()
3495 self.assertRaises(ValueError, f.flush)
3496 self.assertRaises(ValueError, f.fileno)
3497 self.assertRaises(ValueError, f.isatty)
3498 self.assertRaises(ValueError, f.__iter__)
3499 if hasattr(f, "peek"):
3500 self.assertRaises(ValueError, f.peek, 1)
3501 self.assertRaises(ValueError, f.read)
3502 if hasattr(f, "read1"):
3503 self.assertRaises(ValueError, f.read1, 1024)
Martin Panterccb2c0e2016-10-20 23:48:14 +00003504 self.assertRaises(ValueError, f.read1)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003505 if hasattr(f, "readall"):
3506 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003507 if hasattr(f, "readinto"):
3508 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003509 if hasattr(f, "readinto1"):
3510 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003511 self.assertRaises(ValueError, f.readline)
3512 self.assertRaises(ValueError, f.readlines)
Xiang Zhang026435c2017-04-15 12:47:28 +08003513 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003514 self.assertRaises(ValueError, f.seek, 0)
3515 self.assertRaises(ValueError, f.tell)
3516 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003517 self.assertRaises(ValueError, f.write,
3518 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003519 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003520 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003521
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003522 def test_blockingioerror(self):
3523 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003524 class C(str):
3525 pass
3526 c = C("")
3527 b = self.BlockingIOError(1, c)
3528 c.b = b
3529 b.c = c
3530 wr = weakref.ref(c)
3531 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003532 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003533 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003534
3535 def test_abcs(self):
3536 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003537 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3538 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3539 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3540 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003541
3542 def _check_abc_inheritance(self, abcmodule):
3543 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003544 self.assertIsInstance(f, abcmodule.IOBase)
3545 self.assertIsInstance(f, abcmodule.RawIOBase)
3546 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3547 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003548 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003549 self.assertIsInstance(f, abcmodule.IOBase)
3550 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3551 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3552 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003553 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003554 self.assertIsInstance(f, abcmodule.IOBase)
3555 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3556 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3557 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003558
3559 def test_abc_inheritance(self):
3560 # Test implementations inherit from their respective ABCs
3561 self._check_abc_inheritance(self)
3562
3563 def test_abc_inheritance_official(self):
3564 # Test implementations inherit from the official ABCs of the
3565 # baseline "io" module.
3566 self._check_abc_inheritance(io)
3567
Antoine Pitroue033e062010-10-29 10:38:18 +00003568 def _check_warn_on_dealloc(self, *args, **kwargs):
3569 f = open(*args, **kwargs)
3570 r = repr(f)
3571 with self.assertWarns(ResourceWarning) as cm:
3572 f = None
3573 support.gc_collect()
3574 self.assertIn(r, str(cm.warning.args[0]))
3575
3576 def test_warn_on_dealloc(self):
3577 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3578 self._check_warn_on_dealloc(support.TESTFN, "wb")
3579 self._check_warn_on_dealloc(support.TESTFN, "w")
3580
3581 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3582 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003583 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003584 for fd in fds:
3585 try:
3586 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003587 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003588 if e.errno != errno.EBADF:
3589 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003590 self.addCleanup(cleanup_fds)
3591 r, w = os.pipe()
3592 fds += r, w
3593 self._check_warn_on_dealloc(r, *args, **kwargs)
3594 # When using closefd=False, there's no warning
3595 r, w = os.pipe()
3596 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02003597 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00003598 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00003599
3600 def test_warn_on_dealloc_fd(self):
3601 self._check_warn_on_dealloc_fd("rb", buffering=0)
3602 self._check_warn_on_dealloc_fd("rb")
3603 self._check_warn_on_dealloc_fd("r")
3604
3605
Antoine Pitrou243757e2010-11-05 21:15:39 +00003606 def test_pickling(self):
3607 # Pickling file objects is forbidden
3608 for kwargs in [
3609 {"mode": "w"},
3610 {"mode": "wb"},
3611 {"mode": "wb", "buffering": 0},
3612 {"mode": "r"},
3613 {"mode": "rb"},
3614 {"mode": "rb", "buffering": 0},
3615 {"mode": "w+"},
3616 {"mode": "w+b"},
3617 {"mode": "w+b", "buffering": 0},
3618 ]:
3619 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3620 with self.open(support.TESTFN, **kwargs) as f:
3621 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3622
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003623 def test_nonblock_pipe_write_bigbuf(self):
3624 self._test_nonblock_pipe_write(16*1024)
3625
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003626 def test_nonblock_pipe_write_smallbuf(self):
3627 self._test_nonblock_pipe_write(1024)
3628
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003629 @unittest.skipUnless(hasattr(os, 'set_blocking'),
3630 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003631 def _test_nonblock_pipe_write(self, bufsize):
3632 sent = []
3633 received = []
3634 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003635 os.set_blocking(r, False)
3636 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003637
3638 # To exercise all code paths in the C implementation we need
3639 # to play with buffer sizes. For instance, if we choose a
3640 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3641 # then we will never get a partial write of the buffer.
3642 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3643 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3644
3645 with rf, wf:
3646 for N in 9999, 73, 7574:
3647 try:
3648 i = 0
3649 while True:
3650 msg = bytes([i % 26 + 97]) * N
3651 sent.append(msg)
3652 wf.write(msg)
3653 i += 1
3654
3655 except self.BlockingIOError as e:
3656 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003657 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003658 sent[-1] = sent[-1][:e.characters_written]
3659 received.append(rf.read())
3660 msg = b'BLOCKED'
3661 wf.write(msg)
3662 sent.append(msg)
3663
3664 while True:
3665 try:
3666 wf.flush()
3667 break
3668 except self.BlockingIOError as e:
3669 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003670 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003671 self.assertEqual(e.characters_written, 0)
3672 received.append(rf.read())
3673
3674 received += iter(rf.read, None)
3675
3676 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003677 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003678 self.assertTrue(wf.closed)
3679 self.assertTrue(rf.closed)
3680
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003681 def test_create_fail(self):
3682 # 'x' mode fails if file is existing
3683 with self.open(support.TESTFN, 'w'):
3684 pass
3685 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3686
3687 def test_create_writes(self):
3688 # 'x' mode opens for writing
3689 with self.open(support.TESTFN, 'xb') as f:
3690 f.write(b"spam")
3691 with self.open(support.TESTFN, 'rb') as f:
3692 self.assertEqual(b"spam", f.read())
3693
Christian Heimes7b648752012-09-10 14:48:43 +02003694 def test_open_allargs(self):
3695 # there used to be a buffer overflow in the parser for rawmode
3696 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3697
3698
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003699class CMiscIOTest(MiscIOTest):
3700 io = io
3701
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003702 def test_readinto_buffer_overflow(self):
3703 # Issue #18025
3704 class BadReader(self.io.BufferedIOBase):
3705 def read(self, n=-1):
3706 return b'x' * 10**6
3707 bufio = BadReader()
3708 b = bytearray(2)
3709 self.assertRaises(ValueError, bufio.readinto, b)
3710
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003711 @unittest.skipUnless(threading, 'Threading required for this test.')
3712 def check_daemon_threads_shutdown_deadlock(self, stream_name):
3713 # Issue #23309: deadlocks at shutdown should be avoided when a
3714 # daemon thread and the main thread both write to a file.
3715 code = """if 1:
3716 import sys
3717 import time
3718 import threading
3719
3720 file = sys.{stream_name}
3721
3722 def run():
3723 while True:
3724 file.write('.')
3725 file.flush()
3726
3727 thread = threading.Thread(target=run)
3728 thread.daemon = True
3729 thread.start()
3730
3731 time.sleep(0.5)
3732 file.write('!')
3733 file.flush()
3734 """.format_map(locals())
3735 res, _ = run_python_until_end("-c", code)
3736 err = res.err.decode()
3737 if res.rc != 0:
3738 # Failure: should be a fatal error
3739 self.assertIn("Fatal Python error: could not acquire lock "
3740 "for <_io.BufferedWriter name='<{stream_name}>'> "
3741 "at interpreter shutdown, possibly due to "
3742 "daemon threads".format_map(locals()),
3743 err)
3744 else:
3745 self.assertFalse(err.strip('.!'))
3746
3747 def test_daemon_threads_shutdown_stdout_deadlock(self):
3748 self.check_daemon_threads_shutdown_deadlock('stdout')
3749
3750 def test_daemon_threads_shutdown_stderr_deadlock(self):
3751 self.check_daemon_threads_shutdown_deadlock('stderr')
3752
3753
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003754class PyMiscIOTest(MiscIOTest):
3755 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003756
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003757
3758@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3759class SignalsTest(unittest.TestCase):
3760
3761 def setUp(self):
3762 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3763
3764 def tearDown(self):
3765 signal.signal(signal.SIGALRM, self.oldalrm)
3766
3767 def alarm_interrupt(self, sig, frame):
3768 1/0
3769
3770 @unittest.skipUnless(threading, 'Threading required for this test.')
3771 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3772 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003773 invokes the signal handler, and bubbles up the exception raised
3774 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003775 read_results = []
3776 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003777 if hasattr(signal, 'pthread_sigmask'):
3778 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003779 s = os.read(r, 1)
3780 read_results.append(s)
3781 t = threading.Thread(target=_read)
3782 t.daemon = True
3783 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003784 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01003785 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003786 try:
3787 wio = self.io.open(w, **fdopen_kwargs)
3788 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003789 # Fill the pipe enough that the write will be blocking.
3790 # It will be interrupted by the timer armed above. Since the
3791 # other thread has read one byte, the low-level write will
3792 # return with a successful (partial) result rather than an EINTR.
3793 # The buffered IO layer must check for pending signal
3794 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003795 signal.alarm(1)
3796 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01003797 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02003798 finally:
3799 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003800 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003801 # We got one byte, get another one and check that it isn't a
3802 # repeat of the first one.
3803 read_results.append(os.read(r, 1))
3804 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3805 finally:
3806 os.close(w)
3807 os.close(r)
3808 # This is deliberate. If we didn't close the file descriptor
3809 # before closing wio, wio would try to flush its internal
3810 # buffer, and block again.
3811 try:
3812 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003813 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003814 if e.errno != errno.EBADF:
3815 raise
3816
3817 def test_interrupted_write_unbuffered(self):
3818 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3819
3820 def test_interrupted_write_buffered(self):
3821 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3822
Victor Stinner6ab72862014-09-03 23:32:28 +02003823 # Issue #22331: The test hangs on FreeBSD 7.2
3824 @support.requires_freebsd_version(8)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003825 def test_interrupted_write_text(self):
3826 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3827
Brett Cannon31f59292011-02-21 19:29:56 +00003828 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003829 def check_reentrant_write(self, data, **fdopen_kwargs):
3830 def on_alarm(*args):
3831 # Will be called reentrantly from the same thread
3832 wio.write(data)
3833 1/0
3834 signal.signal(signal.SIGALRM, on_alarm)
3835 r, w = os.pipe()
3836 wio = self.io.open(w, **fdopen_kwargs)
3837 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003838 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003839 # Either the reentrant call to wio.write() fails with RuntimeError,
3840 # or the signal handler raises ZeroDivisionError.
3841 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3842 while 1:
3843 for i in range(100):
3844 wio.write(data)
3845 wio.flush()
3846 # Make sure the buffer doesn't fill up and block further writes
3847 os.read(r, len(data) * 100)
3848 exc = cm.exception
3849 if isinstance(exc, RuntimeError):
3850 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3851 finally:
3852 wio.close()
3853 os.close(r)
3854
3855 def test_reentrant_write_buffered(self):
3856 self.check_reentrant_write(b"xy", mode="wb")
3857
3858 def test_reentrant_write_text(self):
3859 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3860
Antoine Pitrou707ce822011-02-25 21:24:11 +00003861 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3862 """Check that a buffered read, when it gets interrupted (either
3863 returning a partial result or EINTR), properly invokes the signal
3864 handler and retries if the latter returned successfully."""
3865 r, w = os.pipe()
3866 fdopen_kwargs["closefd"] = False
3867 def alarm_handler(sig, frame):
3868 os.write(w, b"bar")
3869 signal.signal(signal.SIGALRM, alarm_handler)
3870 try:
3871 rio = self.io.open(r, **fdopen_kwargs)
3872 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003873 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003874 # Expected behaviour:
3875 # - first raw read() returns partial b"foo"
3876 # - second raw read() returns EINTR
3877 # - third raw read() returns b"bar"
3878 self.assertEqual(decode(rio.read(6)), "foobar")
3879 finally:
3880 rio.close()
3881 os.close(w)
3882 os.close(r)
3883
Antoine Pitrou20db5112011-08-19 20:32:34 +02003884 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003885 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3886 mode="rb")
3887
Antoine Pitrou20db5112011-08-19 20:32:34 +02003888 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003889 self.check_interrupted_read_retry(lambda x: x,
3890 mode="r")
3891
3892 @unittest.skipUnless(threading, 'Threading required for this test.')
3893 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3894 """Check that a buffered write, when it gets interrupted (either
3895 returning a partial result or EINTR), properly invokes the signal
3896 handler and retries if the latter returned successfully."""
3897 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003898
Antoine Pitrou707ce822011-02-25 21:24:11 +00003899 # A quantity that exceeds the buffer size of an anonymous pipe's
3900 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003901 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003902 r, w = os.pipe()
3903 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003904
Antoine Pitrou707ce822011-02-25 21:24:11 +00003905 # We need a separate thread to read from the pipe and allow the
3906 # write() to finish. This thread is started after the SIGALRM is
3907 # received (forcing a first EINTR in write()).
3908 read_results = []
3909 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003910 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00003911 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003912 try:
3913 while not write_finished:
3914 while r in select.select([r], [], [], 1.0)[0]:
3915 s = os.read(r, 1024)
3916 read_results.append(s)
3917 except BaseException as exc:
3918 nonlocal error
3919 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00003920 t = threading.Thread(target=_read)
3921 t.daemon = True
3922 def alarm1(sig, frame):
3923 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003924 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003925 def alarm2(sig, frame):
3926 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003927
3928 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00003929 signal.signal(signal.SIGALRM, alarm1)
3930 try:
3931 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003932 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003933 # Expected behaviour:
3934 # - first raw write() is partial (because of the limited pipe buffer
3935 # and the first alarm)
3936 # - second raw write() returns EINTR (because of the second alarm)
3937 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003938 written = wio.write(large_data)
3939 self.assertEqual(N, written)
3940
Antoine Pitrou707ce822011-02-25 21:24:11 +00003941 wio.flush()
3942 write_finished = True
3943 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003944
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003945 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003946 self.assertEqual(N, sum(len(x) for x in read_results))
3947 finally:
3948 write_finished = True
3949 os.close(w)
3950 os.close(r)
3951 # This is deliberate. If we didn't close the file descriptor
3952 # before closing wio, wio would try to flush its internal
3953 # buffer, and could block (in case of failure).
3954 try:
3955 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003956 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003957 if e.errno != errno.EBADF:
3958 raise
3959
Antoine Pitrou20db5112011-08-19 20:32:34 +02003960 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003961 self.check_interrupted_write_retry(b"x", mode="wb")
3962
Antoine Pitrou20db5112011-08-19 20:32:34 +02003963 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003964 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3965
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003966
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003967class CSignalsTest(SignalsTest):
3968 io = io
3969
3970class PySignalsTest(SignalsTest):
3971 io = pyio
3972
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003973 # Handling reentrancy issues would slow down _pyio even more, so the
3974 # tests are disabled.
3975 test_reentrant_write_buffered = None
3976 test_reentrant_write_text = None
3977
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003978
Ezio Melottidaa42c72013-03-23 16:30:16 +02003979def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07003980 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003981 CBufferedReaderTest, PyBufferedReaderTest,
3982 CBufferedWriterTest, PyBufferedWriterTest,
3983 CBufferedRWPairTest, PyBufferedRWPairTest,
3984 CBufferedRandomTest, PyBufferedRandomTest,
3985 StatefulIncrementalDecoderTest,
3986 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3987 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003988 CMiscIOTest, PyMiscIOTest,
3989 CSignalsTest, PySignalsTest,
3990 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003991
3992 # Put the namespaces of the IO module we are testing and some useful mock
3993 # classes in the __dict__ of each test.
3994 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003995 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003996 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3997 c_io_ns = {name : getattr(io, name) for name in all_members}
3998 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3999 globs = globals()
4000 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4001 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4002 # Avoid turning open into a bound method.
4003 py_io_ns["open"] = pyio.OpenWrapper
4004 for test in tests:
4005 if test.__name__.startswith("C"):
4006 for name, obj in c_io_ns.items():
4007 setattr(test, name, obj)
4008 elif test.__name__.startswith("Py"):
4009 for name, obj in py_io_ns.items():
4010 setattr(test, name, obj)
4011
Ezio Melottidaa42c72013-03-23 16:30:16 +02004012 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4013 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004014
4015if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004016 unittest.main()