blob: e04baefef081d4a227911a116bff87b1b4d5ff96 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Berker Peksagce643912015-05-06 06:33:17 +030038from test.support.script_helper import assert_python_ok, run_python_until_end
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000047
Martin Panter6bb91f32016-05-28 00:41:57 +000048try:
49 import ctypes
50except ImportError:
51 def byteslike(*pos, **kw):
52 return array.array("b", bytes(*pos, **kw))
53else:
54 def byteslike(*pos, **kw):
55 """Create a bytes-like object having no string or sequence methods"""
56 data = bytes(*pos, **kw)
57 obj = EmptyStruct()
58 ctypes.resize(obj, len(data))
59 memoryview(obj).cast("B")[:] = data
60 return obj
61 class EmptyStruct(ctypes.Structure):
62 pass
63
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064def _default_chunk_size():
65 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000066 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000067 return f._CHUNK_SIZE
68
69
Antoine Pitrou328ec742010-09-14 18:37:24 +000070class MockRawIOWithoutRead:
71 """A RawIO implementation without read(), so as to exercise the default
72 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000073
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000074 def __init__(self, read_stack=()):
75 self._read_stack = list(read_stack)
76 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000077 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000078 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000079
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000082 return len(b)
83
84 def writable(self):
85 return True
86
Guido van Rossum68bbcd22007-02-27 17:19:33 +000087 def fileno(self):
88 return 42
89
90 def readable(self):
91 return True
92
Guido van Rossum01a27522007-03-07 01:00:12 +000093 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000094 return True
95
Guido van Rossum01a27522007-03-07 01:00:12 +000096 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000098
99 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000100 return 0 # same comment as above
101
102 def readinto(self, buf):
103 self._reads += 1
104 max_len = len(buf)
105 try:
106 data = self._read_stack[0]
107 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000108 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000109 return 0
110 if data is None:
111 del self._read_stack[0]
112 return None
113 n = len(data)
114 if len(data) <= max_len:
115 del self._read_stack[0]
116 buf[:n] = data
117 return n
118 else:
119 buf[:] = data[:max_len]
120 self._read_stack[0] = data[max_len:]
121 return max_len
122
123 def truncate(self, pos=None):
124 return pos
125
Antoine Pitrou328ec742010-09-14 18:37:24 +0000126class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
127 pass
128
129class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
130 pass
131
132
133class MockRawIO(MockRawIOWithoutRead):
134
135 def read(self, n=None):
136 self._reads += 1
137 try:
138 return self._read_stack.pop(0)
139 except:
140 self._extraneous_reads += 1
141 return b""
142
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000143class CMockRawIO(MockRawIO, io.RawIOBase):
144 pass
145
146class PyMockRawIO(MockRawIO, pyio.RawIOBase):
147 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000148
Guido van Rossuma9e20242007-03-08 00:43:48 +0000149
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000150class MisbehavedRawIO(MockRawIO):
151 def write(self, b):
152 return super().write(b) * 2
153
154 def read(self, n=None):
155 return super().read(n) * 2
156
157 def seek(self, pos, whence):
158 return -123
159
160 def tell(self):
161 return -456
162
163 def readinto(self, buf):
164 super().readinto(buf)
165 return len(buf) * 5
166
167class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
168 pass
169
170class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
171 pass
172
173
174class CloseFailureIO(MockRawIO):
175 closed = 0
176
177 def close(self):
178 if not self.closed:
179 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200180 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181
182class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
183 pass
184
185class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
186 pass
187
188
189class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000190
191 def __init__(self, data):
192 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000193 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000194
195 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000197 self.read_history.append(None if res is None else len(res))
198 return res
199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000200 def readinto(self, b):
201 res = super().readinto(b)
202 self.read_history.append(res)
203 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000204
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000205class CMockFileIO(MockFileIO, io.BytesIO):
206 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000207
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000208class PyMockFileIO(MockFileIO, pyio.BytesIO):
209 pass
210
211
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000212class MockUnseekableIO:
213 def seekable(self):
214 return False
215
216 def seek(self, *args):
217 raise self.UnsupportedOperation("not seekable")
218
219 def tell(self, *args):
220 raise self.UnsupportedOperation("not seekable")
221
Martin Panter754aab22016-03-31 07:21:56 +0000222 def truncate(self, *args):
223 raise self.UnsupportedOperation("not seekable")
224
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000225class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
226 UnsupportedOperation = io.UnsupportedOperation
227
228class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
229 UnsupportedOperation = pyio.UnsupportedOperation
230
231
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000232class MockNonBlockWriterIO:
233
234 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000235 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000236 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000238 def pop_written(self):
239 s = b"".join(self._write_stack)
240 self._write_stack[:] = []
241 return s
242
243 def block_on(self, char):
244 """Block when a given char is encountered."""
245 self._blocker_char = char
246
247 def readable(self):
248 return True
249
250 def seekable(self):
251 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000252
Guido van Rossum01a27522007-03-07 01:00:12 +0000253 def writable(self):
254 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000255
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000256 def write(self, b):
257 b = bytes(b)
258 n = -1
259 if self._blocker_char:
260 try:
261 n = b.index(self._blocker_char)
262 except ValueError:
263 pass
264 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100265 if n > 0:
266 # write data up to the first blocker
267 self._write_stack.append(b[:n])
268 return n
269 else:
270 # cancel blocker and indicate would block
271 self._blocker_char = None
272 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000273 self._write_stack.append(b)
274 return len(b)
275
276class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
277 BlockingIOError = io.BlockingIOError
278
279class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
280 BlockingIOError = pyio.BlockingIOError
281
Guido van Rossuma9e20242007-03-08 00:43:48 +0000282
Guido van Rossum28524c72007-02-27 05:47:44 +0000283class IOTest(unittest.TestCase):
284
Neal Norwitze7789b12008-03-24 06:18:09 +0000285 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000286 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000287
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000288 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000289 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000290
Guido van Rossum28524c72007-02-27 05:47:44 +0000291 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000292 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293 f.truncate(0)
294 self.assertEqual(f.tell(), 5)
295 f.seek(0)
296
297 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000298 self.assertEqual(f.seek(0), 0)
299 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000300 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000301 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000302 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000303 buffer = bytearray(b" world\n\n\n")
304 self.assertEqual(f.write(buffer), 9)
305 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000306 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000307 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000308 self.assertEqual(f.seek(-1, 2), 13)
309 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000310
Guido van Rossum87429772007-04-10 21:06:59 +0000311 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000312 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000313 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000314
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 def read_ops(self, f, buffered=False):
316 data = f.read(5)
317 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000318 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000319 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000320 self.assertEqual(bytes(data), b" worl")
321 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000322 self.assertEqual(f.readinto(data), 2)
323 self.assertEqual(len(data), 5)
324 self.assertEqual(data[:2], b"d\n")
325 self.assertEqual(f.seek(0), 0)
326 self.assertEqual(f.read(20), b"hello world\n")
327 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000328 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000329 self.assertEqual(f.seek(-6, 2), 6)
330 self.assertEqual(f.read(5), b"world")
331 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000332 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000333 self.assertEqual(f.seek(-6, 1), 5)
334 self.assertEqual(f.read(5), b" worl")
335 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000336 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000337 if buffered:
338 f.seek(0)
339 self.assertEqual(f.read(), b"hello world\n")
340 f.seek(6)
341 self.assertEqual(f.read(), b"world\n")
342 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000343 f.seek(0)
344 data = byteslike(5)
345 self.assertEqual(f.readinto1(data), 5)
346 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000347
Guido van Rossum34d69e52007-04-10 20:08:41 +0000348 LARGE = 2**31
349
Guido van Rossum53807da2007-04-10 19:01:47 +0000350 def large_file_ops(self, f):
351 assert f.readable()
352 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100353 try:
354 self.assertEqual(f.seek(self.LARGE), self.LARGE)
355 except (OverflowError, ValueError):
356 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000357 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000358 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000359 self.assertEqual(f.tell(), self.LARGE + 3)
360 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000361 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000362 self.assertEqual(f.tell(), self.LARGE + 2)
363 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000364 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000365 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000366 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
367 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000368 self.assertEqual(f.read(2), b"x")
369
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000370 def test_invalid_operations(self):
371 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000372 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000373 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000374 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000375 self.assertRaises(exc, fp.read)
376 self.assertRaises(exc, fp.readline)
377 with self.open(support.TESTFN, "wb", buffering=0) as fp:
378 self.assertRaises(exc, fp.read)
379 self.assertRaises(exc, fp.readline)
380 with self.open(support.TESTFN, "rb", buffering=0) as fp:
381 self.assertRaises(exc, fp.write, b"blah")
382 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000383 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000384 self.assertRaises(exc, fp.write, b"blah")
385 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000386 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000387 self.assertRaises(exc, fp.write, "blah")
388 self.assertRaises(exc, fp.writelines, ["blah\n"])
389 # Non-zero seeking from current or end pos
390 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
391 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000392
Martin Panter754aab22016-03-31 07:21:56 +0000393 def test_optional_abilities(self):
394 # Test for OSError when optional APIs are not supported
395 # The purpose of this test is to try fileno(), reading, writing and
396 # seeking operations with various objects that indicate they do not
397 # support these operations.
398
399 def pipe_reader():
400 [r, w] = os.pipe()
401 os.close(w) # So that read() is harmless
402 return self.FileIO(r, "r")
403
404 def pipe_writer():
405 [r, w] = os.pipe()
406 self.addCleanup(os.close, r)
407 # Guarantee that we can write into the pipe without blocking
408 thread = threading.Thread(target=os.read, args=(r, 100))
409 thread.start()
410 self.addCleanup(thread.join)
411 return self.FileIO(w, "w")
412
413 def buffered_reader():
414 return self.BufferedReader(self.MockUnseekableIO())
415
416 def buffered_writer():
417 return self.BufferedWriter(self.MockUnseekableIO())
418
419 def buffered_random():
420 return self.BufferedRandom(self.BytesIO())
421
422 def buffered_rw_pair():
423 return self.BufferedRWPair(self.MockUnseekableIO(),
424 self.MockUnseekableIO())
425
426 def text_reader():
427 class UnseekableReader(self.MockUnseekableIO):
428 writable = self.BufferedIOBase.writable
429 write = self.BufferedIOBase.write
430 return self.TextIOWrapper(UnseekableReader(), "ascii")
431
432 def text_writer():
433 class UnseekableWriter(self.MockUnseekableIO):
434 readable = self.BufferedIOBase.readable
435 read = self.BufferedIOBase.read
436 return self.TextIOWrapper(UnseekableWriter(), "ascii")
437
438 tests = (
439 (pipe_reader, "fr"), (pipe_writer, "fw"),
440 (buffered_reader, "r"), (buffered_writer, "w"),
441 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
442 (text_reader, "r"), (text_writer, "w"),
443 (self.BytesIO, "rws"), (self.StringIO, "rws"),
444 )
445 for [test, abilities] in tests:
446 if test is pipe_writer and not threading:
447 continue # Skip subtest that uses a background thread
448 with self.subTest(test), test() as obj:
449 readable = "r" in abilities
450 self.assertEqual(obj.readable(), readable)
451 writable = "w" in abilities
452 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000453
454 if isinstance(obj, self.TextIOBase):
455 data = "3"
456 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
457 data = b"3"
458 else:
459 self.fail("Unknown base class")
460
461 if "f" in abilities:
462 obj.fileno()
463 else:
464 self.assertRaises(OSError, obj.fileno)
465
466 if readable:
467 obj.read(1)
468 obj.read()
469 else:
470 self.assertRaises(OSError, obj.read, 1)
471 self.assertRaises(OSError, obj.read)
472
473 if writable:
474 obj.write(data)
475 else:
476 self.assertRaises(OSError, obj.write, data)
477
Martin Panter3ee147f2016-03-31 21:05:31 +0000478 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000479 pipe_reader, pipe_writer):
480 # Pipes seem to appear as seekable on Windows
481 continue
482 seekable = "s" in abilities
483 self.assertEqual(obj.seekable(), seekable)
484
Martin Panter754aab22016-03-31 07:21:56 +0000485 if seekable:
486 obj.tell()
487 obj.seek(0)
488 else:
489 self.assertRaises(OSError, obj.tell)
490 self.assertRaises(OSError, obj.seek, 0)
491
492 if writable and seekable:
493 obj.truncate()
494 obj.truncate(0)
495 else:
496 self.assertRaises(OSError, obj.truncate)
497 self.assertRaises(OSError, obj.truncate, 0)
498
Antoine Pitrou13348842012-01-29 18:36:34 +0100499 def test_open_handles_NUL_chars(self):
500 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300501 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100502
503 bytes_fn = bytes(fn_with_NUL, 'ascii')
504 with warnings.catch_warnings():
505 warnings.simplefilter("ignore", DeprecationWarning)
506 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100507
Guido van Rossum28524c72007-02-27 05:47:44 +0000508 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000509 with self.open(support.TESTFN, "wb", buffering=0) as f:
510 self.assertEqual(f.readable(), False)
511 self.assertEqual(f.writable(), True)
512 self.assertEqual(f.seekable(), True)
513 self.write_ops(f)
514 with self.open(support.TESTFN, "rb", buffering=0) as f:
515 self.assertEqual(f.readable(), True)
516 self.assertEqual(f.writable(), False)
517 self.assertEqual(f.seekable(), True)
518 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000519
Guido van Rossum87429772007-04-10 21:06:59 +0000520 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000521 with self.open(support.TESTFN, "wb") as f:
522 self.assertEqual(f.readable(), False)
523 self.assertEqual(f.writable(), True)
524 self.assertEqual(f.seekable(), True)
525 self.write_ops(f)
526 with self.open(support.TESTFN, "rb") as f:
527 self.assertEqual(f.readable(), True)
528 self.assertEqual(f.writable(), False)
529 self.assertEqual(f.seekable(), True)
530 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000531
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000532 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000533 with self.open(support.TESTFN, "wb") as f:
534 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
535 with self.open(support.TESTFN, "rb") as f:
536 self.assertEqual(f.readline(), b"abc\n")
537 self.assertEqual(f.readline(10), b"def\n")
538 self.assertEqual(f.readline(2), b"xy")
539 self.assertEqual(f.readline(4), b"zzy\n")
540 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000541 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000542 self.assertRaises(TypeError, f.readline, 5.3)
543 with self.open(support.TESTFN, "r") as f:
544 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000545
Guido van Rossum28524c72007-02-27 05:47:44 +0000546 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000547 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000548 self.write_ops(f)
549 data = f.getvalue()
550 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000551 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000552 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000553
Guido van Rossum53807da2007-04-10 19:01:47 +0000554 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000555 # On Windows and Mac OSX this test comsumes large resources; It takes
556 # a long time to build the >2GB file and takes >2GB of disk space
557 # therefore the resource must be enabled to run this test.
558 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600559 support.requires(
560 'largefile',
561 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000562 with self.open(support.TESTFN, "w+b", 0) as f:
563 self.large_file_ops(f)
564 with self.open(support.TESTFN, "w+b") as f:
565 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000566
567 def test_with_open(self):
568 for bufsize in (0, 1, 100):
569 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000570 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000571 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000572 self.assertEqual(f.closed, True)
573 f = None
574 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000575 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000576 1/0
577 except ZeroDivisionError:
578 self.assertEqual(f.closed, True)
579 else:
580 self.fail("1/0 didn't raise an exception")
581
Antoine Pitrou08838b62009-01-21 00:55:13 +0000582 # issue 5008
583 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000584 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000585 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000586 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000587 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000588 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000589 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000590 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300591 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000592
Guido van Rossum87429772007-04-10 21:06:59 +0000593 def test_destructor(self):
594 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000595 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000596 def __del__(self):
597 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000598 try:
599 f = super().__del__
600 except AttributeError:
601 pass
602 else:
603 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000604 def close(self):
605 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000606 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000607 def flush(self):
608 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000609 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000610 with support.check_warnings(('', ResourceWarning)):
611 f = MyFileIO(support.TESTFN, "wb")
612 f.write(b"xxx")
613 del f
614 support.gc_collect()
615 self.assertEqual(record, [1, 2, 3])
616 with self.open(support.TESTFN, "rb") as f:
617 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000618
619 def _check_base_destructor(self, base):
620 record = []
621 class MyIO(base):
622 def __init__(self):
623 # This exercises the availability of attributes on object
624 # destruction.
625 # (in the C version, close() is called by the tp_dealloc
626 # function, not by __del__)
627 self.on_del = 1
628 self.on_close = 2
629 self.on_flush = 3
630 def __del__(self):
631 record.append(self.on_del)
632 try:
633 f = super().__del__
634 except AttributeError:
635 pass
636 else:
637 f()
638 def close(self):
639 record.append(self.on_close)
640 super().close()
641 def flush(self):
642 record.append(self.on_flush)
643 super().flush()
644 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000645 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000646 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000647 self.assertEqual(record, [1, 2, 3])
648
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000649 def test_IOBase_destructor(self):
650 self._check_base_destructor(self.IOBase)
651
652 def test_RawIOBase_destructor(self):
653 self._check_base_destructor(self.RawIOBase)
654
655 def test_BufferedIOBase_destructor(self):
656 self._check_base_destructor(self.BufferedIOBase)
657
658 def test_TextIOBase_destructor(self):
659 self._check_base_destructor(self.TextIOBase)
660
Guido van Rossum87429772007-04-10 21:06:59 +0000661 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000662 with self.open(support.TESTFN, "wb") as f:
663 f.write(b"xxx")
664 with self.open(support.TESTFN, "rb") as f:
665 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000666
Guido van Rossumd4103952007-04-12 05:44:49 +0000667 def test_array_writes(self):
668 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000669 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000670 def check(f):
671 with f:
672 self.assertEqual(f.write(a), n)
673 f.writelines((a,))
674 check(self.BytesIO())
675 check(self.FileIO(support.TESTFN, "w"))
676 check(self.BufferedWriter(self.MockRawIO()))
677 check(self.BufferedRandom(self.MockRawIO()))
678 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000679
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000680 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000681 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000682 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000683
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000684 def test_read_closed(self):
685 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000686 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000687 with self.open(support.TESTFN, "r") as f:
688 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000689 self.assertEqual(file.read(), "egg\n")
690 file.seek(0)
691 file.close()
692 self.assertRaises(ValueError, file.read)
693
694 def test_no_closefd_with_filename(self):
695 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000696 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000697
698 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000699 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000700 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000701 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000702 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000703 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000704 self.assertEqual(file.buffer.raw.closefd, False)
705
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000706 def test_garbage_collection(self):
707 # FileIO objects are collected, and collecting them flushes
708 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000709 with support.check_warnings(('', ResourceWarning)):
710 f = self.FileIO(support.TESTFN, "wb")
711 f.write(b"abcxxx")
712 f.f = f
713 wr = weakref.ref(f)
714 del f
715 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300716 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000717 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000718 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000719
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000720 def test_unbounded_file(self):
721 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
722 zero = "/dev/zero"
723 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000724 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000725 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000726 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000727 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000728 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000729 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000730 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000731 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000732 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000733 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000734 self.assertRaises(OverflowError, f.read)
735
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200736 def check_flush_error_on_close(self, *args, **kwargs):
737 # Test that the file is closed despite failed flush
738 # and that flush() is called before file closed.
739 f = self.open(*args, **kwargs)
740 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000741 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200742 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200743 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000744 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200745 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600746 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200747 self.assertTrue(closed) # flush() called
748 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200749 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200750
751 def test_flush_error_on_close(self):
752 # raw file
753 # Issue #5700: io.FileIO calls flush() after file closed
754 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
755 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
756 self.check_flush_error_on_close(fd, 'wb', buffering=0)
757 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
758 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
759 os.close(fd)
760 # buffered io
761 self.check_flush_error_on_close(support.TESTFN, 'wb')
762 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
763 self.check_flush_error_on_close(fd, 'wb')
764 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
765 self.check_flush_error_on_close(fd, 'wb', closefd=False)
766 os.close(fd)
767 # text io
768 self.check_flush_error_on_close(support.TESTFN, 'w')
769 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
770 self.check_flush_error_on_close(fd, 'w')
771 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
772 self.check_flush_error_on_close(fd, 'w', closefd=False)
773 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000774
775 def test_multi_close(self):
776 f = self.open(support.TESTFN, "wb", buffering=0)
777 f.close()
778 f.close()
779 f.close()
780 self.assertRaises(ValueError, f.flush)
781
Antoine Pitrou328ec742010-09-14 18:37:24 +0000782 def test_RawIOBase_read(self):
783 # Exercise the default RawIOBase.read() implementation (which calls
784 # readinto() internally).
785 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
786 self.assertEqual(rawio.read(2), b"ab")
787 self.assertEqual(rawio.read(2), b"c")
788 self.assertEqual(rawio.read(2), b"d")
789 self.assertEqual(rawio.read(2), None)
790 self.assertEqual(rawio.read(2), b"ef")
791 self.assertEqual(rawio.read(2), b"g")
792 self.assertEqual(rawio.read(2), None)
793 self.assertEqual(rawio.read(2), b"")
794
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400795 def test_types_have_dict(self):
796 test = (
797 self.IOBase(),
798 self.RawIOBase(),
799 self.TextIOBase(),
800 self.StringIO(),
801 self.BytesIO()
802 )
803 for obj in test:
804 self.assertTrue(hasattr(obj, "__dict__"))
805
Ross Lagerwall59142db2011-10-31 20:34:46 +0200806 def test_opener(self):
807 with self.open(support.TESTFN, "w") as f:
808 f.write("egg\n")
809 fd = os.open(support.TESTFN, os.O_RDONLY)
810 def opener(path, flags):
811 return fd
812 with self.open("non-existent", "r", opener=opener) as f:
813 self.assertEqual(f.read(), "egg\n")
814
Barry Warsaw480e2852016-06-08 17:47:26 -0400815 def test_bad_opener_negative_1(self):
816 # Issue #27066.
817 def badopener(fname, flags):
818 return -1
819 with self.assertRaises(ValueError) as cm:
820 open('non-existent', 'r', opener=badopener)
821 self.assertEqual(str(cm.exception), 'opener returned -1')
822
823 def test_bad_opener_other_negative(self):
824 # Issue #27066.
825 def badopener(fname, flags):
826 return -2
827 with self.assertRaises(ValueError) as cm:
828 open('non-existent', 'r', opener=badopener)
829 self.assertEqual(str(cm.exception), 'opener returned -2')
830
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200831 def test_fileio_closefd(self):
832 # Issue #4841
833 with self.open(__file__, 'rb') as f1, \
834 self.open(__file__, 'rb') as f2:
835 fileio = self.FileIO(f1.fileno(), closefd=False)
836 # .__init__() must not close f1
837 fileio.__init__(f2.fileno(), closefd=False)
838 f1.readline()
839 # .close() must not close f2
840 fileio.close()
841 f2.readline()
842
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300843 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200844 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300845 with self.assertRaises(ValueError):
846 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300847
848 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200849 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300850 with self.assertRaises(ValueError):
851 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300852
Martin Panter6bb91f32016-05-28 00:41:57 +0000853 def test_buffered_readinto_mixin(self):
854 # Test the implementation provided by BufferedIOBase
855 class Stream(self.BufferedIOBase):
856 def read(self, size):
857 return b"12345"
858 read1 = read
859 stream = Stream()
860 for method in ("readinto", "readinto1"):
861 with self.subTest(method):
862 buffer = byteslike(5)
863 self.assertEqual(getattr(stream, method)(buffer), 5)
864 self.assertEqual(bytes(buffer), b"12345")
865
Ethan Furmand62548a2016-06-04 14:38:43 -0700866 def test_fspath_support(self):
867 class PathLike:
868 def __init__(self, path):
869 self.path = path
870
871 def __fspath__(self):
872 return self.path
873
874 def check_path_succeeds(path):
875 with self.open(path, "w") as f:
876 f.write("egg\n")
877
878 with self.open(path, "r") as f:
879 self.assertEqual(f.read(), "egg\n")
880
881 check_path_succeeds(PathLike(support.TESTFN))
882 check_path_succeeds(PathLike(support.TESTFN.encode('utf-8')))
883
884 bad_path = PathLike(TypeError)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700885 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700886 self.open(bad_path, 'w')
887
888 # ensure that refcounting is correct with some error conditions
889 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
890 self.open(PathLike(support.TESTFN), 'rwxa')
891
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200892
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000893class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200894
895 def test_IOBase_finalize(self):
896 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
897 # class which inherits IOBase and an object of this class are caught
898 # in a reference cycle and close() is already in the method cache.
899 class MyIO(self.IOBase):
900 def close(self):
901 pass
902
903 # create an instance to populate the method cache
904 MyIO()
905 obj = MyIO()
906 obj.obj = obj
907 wr = weakref.ref(obj)
908 del MyIO
909 del obj
910 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300911 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000912
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000913class PyIOTest(IOTest):
914 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000915
Guido van Rossuma9e20242007-03-08 00:43:48 +0000916
Gregory P. Smith1bef9072015-04-14 13:24:34 -0700917@support.cpython_only
918class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -0700919
Gregory P. Smith054b0652015-04-14 12:58:05 -0700920 def test_RawIOBase_io_in_pyio_match(self):
921 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +0200922 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
923 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -0700924 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
925
926 def test_RawIOBase_pyio_in_io_match(self):
927 """Test that c RawIOBase class has all pyio RawIOBase methods"""
928 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
929 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
930
931
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000932class CommonBufferedTests:
933 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
934
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000935 def test_detach(self):
936 raw = self.MockRawIO()
937 buf = self.tp(raw)
938 self.assertIs(buf.detach(), raw)
939 self.assertRaises(ValueError, buf.detach)
940
Benjamin Peterson10e76b62014-12-21 20:51:50 -0600941 repr(buf) # Should still work
942
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000943 def test_fileno(self):
944 rawio = self.MockRawIO()
945 bufio = self.tp(rawio)
946
Ezio Melottib3aedd42010-11-20 19:04:17 +0000947 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000948
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000949 def test_invalid_args(self):
950 rawio = self.MockRawIO()
951 bufio = self.tp(rawio)
952 # Invalid whence
953 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200954 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000955
956 def test_override_destructor(self):
957 tp = self.tp
958 record = []
959 class MyBufferedIO(tp):
960 def __del__(self):
961 record.append(1)
962 try:
963 f = super().__del__
964 except AttributeError:
965 pass
966 else:
967 f()
968 def close(self):
969 record.append(2)
970 super().close()
971 def flush(self):
972 record.append(3)
973 super().flush()
974 rawio = self.MockRawIO()
975 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000976 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000977 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +0000978 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000979
980 def test_context_manager(self):
981 # Test usability as a context manager
982 rawio = self.MockRawIO()
983 bufio = self.tp(rawio)
984 def _with():
985 with bufio:
986 pass
987 _with()
988 # bufio should now be closed, and using it a second time should raise
989 # a ValueError.
990 self.assertRaises(ValueError, _with)
991
992 def test_error_through_destructor(self):
993 # Test that the exception state is not modified by a destructor,
994 # even if close() fails.
995 rawio = self.CloseFailureIO()
996 def f():
997 self.tp(rawio).xyzzy
998 with support.captured_output("stderr") as s:
999 self.assertRaises(AttributeError, f)
1000 s = s.getvalue().strip()
1001 if s:
1002 # The destructor *may* have printed an unraisable error, check it
1003 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001004 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001005 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +00001006
Antoine Pitrou716c4442009-05-23 19:04:03 +00001007 def test_repr(self):
1008 raw = self.MockRawIO()
1009 b = self.tp(raw)
Serhiy Storchaka521e5862014-07-22 15:00:37 +03001010 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001011 self.assertEqual(repr(b), "<%s>" % clsname)
1012 raw.name = "dummy"
1013 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
1014 raw.name = b"dummy"
1015 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
1016
Serhiy Storchakafca705d2017-03-19 20:27:16 +02001017 def test_recursive_repr(self):
1018 # Issue #25455
1019 raw = self.MockRawIO()
1020 b = self.tp(raw)
1021 with support.swap_attr(raw, 'name', b):
1022 try:
1023 repr(b) # Should not crash
1024 except RuntimeError:
1025 pass
1026
Antoine Pitrou6be88762010-05-03 16:48:20 +00001027 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001028 # Test that buffered file is closed despite failed flush
1029 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001030 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001031 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001032 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001033 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001034 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001035 raw.flush = bad_flush
1036 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001037 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001038 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001039 self.assertTrue(raw.closed)
1040 self.assertTrue(closed) # flush() called
1041 self.assertFalse(closed[0]) # flush() called before file closed
1042 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001043 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001044
1045 def test_close_error_on_close(self):
1046 raw = self.MockRawIO()
1047 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001048 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001049 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001050 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001051 raw.close = bad_close
1052 b = self.tp(raw)
1053 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001054 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001055 b.close()
1056 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001057 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001058 self.assertEqual(err.exception.__context__.args, ('flush',))
1059 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001060
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001061 def test_nonnormalized_close_error_on_close(self):
1062 # Issue #21677
1063 raw = self.MockRawIO()
1064 def bad_flush():
1065 raise non_existing_flush
1066 def bad_close():
1067 raise non_existing_close
1068 raw.close = bad_close
1069 b = self.tp(raw)
1070 b.flush = bad_flush
1071 with self.assertRaises(NameError) as err: # exception not swallowed
1072 b.close()
1073 self.assertIn('non_existing_close', str(err.exception))
1074 self.assertIsInstance(err.exception.__context__, NameError)
1075 self.assertIn('non_existing_flush', str(err.exception.__context__))
1076 self.assertFalse(b.closed)
1077
Antoine Pitrou6be88762010-05-03 16:48:20 +00001078 def test_multi_close(self):
1079 raw = self.MockRawIO()
1080 b = self.tp(raw)
1081 b.close()
1082 b.close()
1083 b.close()
1084 self.assertRaises(ValueError, b.flush)
1085
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001086 def test_unseekable(self):
1087 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1088 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1089 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1090
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001091 def test_readonly_attributes(self):
1092 raw = self.MockRawIO()
1093 buf = self.tp(raw)
1094 x = self.MockRawIO()
1095 with self.assertRaises(AttributeError):
1096 buf.raw = x
1097
Guido van Rossum78892e42007-04-06 17:31:18 +00001098
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001099class SizeofTest:
1100
1101 @support.cpython_only
1102 def test_sizeof(self):
1103 bufsize1 = 4096
1104 bufsize2 = 8192
1105 rawio = self.MockRawIO()
1106 bufio = self.tp(rawio, buffer_size=bufsize1)
1107 size = sys.getsizeof(bufio) - bufsize1
1108 rawio = self.MockRawIO()
1109 bufio = self.tp(rawio, buffer_size=bufsize2)
1110 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1111
Jesus Ceadc469452012-10-04 12:37:56 +02001112 @support.cpython_only
1113 def test_buffer_freeing(self) :
1114 bufsize = 4096
1115 rawio = self.MockRawIO()
1116 bufio = self.tp(rawio, buffer_size=bufsize)
1117 size = sys.getsizeof(bufio) - bufsize
1118 bufio.close()
1119 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001120
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001121class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1122 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001123
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001124 def test_constructor(self):
1125 rawio = self.MockRawIO([b"abc"])
1126 bufio = self.tp(rawio)
1127 bufio.__init__(rawio)
1128 bufio.__init__(rawio, buffer_size=1024)
1129 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001130 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001131 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1132 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1133 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1134 rawio = self.MockRawIO([b"abc"])
1135 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001136 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001137
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001138 def test_uninitialized(self):
1139 bufio = self.tp.__new__(self.tp)
1140 del bufio
1141 bufio = self.tp.__new__(self.tp)
1142 self.assertRaisesRegex((ValueError, AttributeError),
1143 'uninitialized|has no attribute',
1144 bufio.read, 0)
1145 bufio.__init__(self.MockRawIO())
1146 self.assertEqual(bufio.read(0), b'')
1147
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001148 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001149 for arg in (None, 7):
1150 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1151 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001152 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001153 # Invalid args
1154 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001155
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001156 def test_read1(self):
1157 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1158 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001159 self.assertEqual(b"a", bufio.read(1))
1160 self.assertEqual(b"b", bufio.read1(1))
1161 self.assertEqual(rawio._reads, 1)
1162 self.assertEqual(b"c", bufio.read1(100))
1163 self.assertEqual(rawio._reads, 1)
1164 self.assertEqual(b"d", bufio.read1(100))
1165 self.assertEqual(rawio._reads, 2)
1166 self.assertEqual(b"efg", bufio.read1(100))
1167 self.assertEqual(rawio._reads, 3)
1168 self.assertEqual(b"", bufio.read1(100))
1169 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001170 # Invalid args
1171 self.assertRaises(ValueError, bufio.read1, -1)
1172
1173 def test_readinto(self):
1174 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1175 bufio = self.tp(rawio)
1176 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001177 self.assertEqual(bufio.readinto(b), 2)
1178 self.assertEqual(b, b"ab")
1179 self.assertEqual(bufio.readinto(b), 2)
1180 self.assertEqual(b, b"cd")
1181 self.assertEqual(bufio.readinto(b), 2)
1182 self.assertEqual(b, b"ef")
1183 self.assertEqual(bufio.readinto(b), 1)
1184 self.assertEqual(b, b"gf")
1185 self.assertEqual(bufio.readinto(b), 0)
1186 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001187 rawio = self.MockRawIO((b"abc", None))
1188 bufio = self.tp(rawio)
1189 self.assertEqual(bufio.readinto(b), 2)
1190 self.assertEqual(b, b"ab")
1191 self.assertEqual(bufio.readinto(b), 1)
1192 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001193
Benjamin Petersona96fea02014-06-22 14:17:44 -07001194 def test_readinto1(self):
1195 buffer_size = 10
1196 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1197 bufio = self.tp(rawio, buffer_size=buffer_size)
1198 b = bytearray(2)
1199 self.assertEqual(bufio.peek(3), b'abc')
1200 self.assertEqual(rawio._reads, 1)
1201 self.assertEqual(bufio.readinto1(b), 2)
1202 self.assertEqual(b, b"ab")
1203 self.assertEqual(rawio._reads, 1)
1204 self.assertEqual(bufio.readinto1(b), 1)
1205 self.assertEqual(b[:1], b"c")
1206 self.assertEqual(rawio._reads, 1)
1207 self.assertEqual(bufio.readinto1(b), 2)
1208 self.assertEqual(b, b"de")
1209 self.assertEqual(rawio._reads, 2)
1210 b = bytearray(2*buffer_size)
1211 self.assertEqual(bufio.peek(3), b'fgh')
1212 self.assertEqual(rawio._reads, 3)
1213 self.assertEqual(bufio.readinto1(b), 6)
1214 self.assertEqual(b[:6], b"fghjkl")
1215 self.assertEqual(rawio._reads, 4)
1216
1217 def test_readinto_array(self):
1218 buffer_size = 60
1219 data = b"a" * 26
1220 rawio = self.MockRawIO((data,))
1221 bufio = self.tp(rawio, buffer_size=buffer_size)
1222
1223 # Create an array with element size > 1 byte
1224 b = array.array('i', b'x' * 32)
1225 assert len(b) != 16
1226
1227 # Read into it. We should get as many *bytes* as we can fit into b
1228 # (which is more than the number of elements)
1229 n = bufio.readinto(b)
1230 self.assertGreater(n, len(b))
1231
1232 # Check that old contents of b are preserved
1233 bm = memoryview(b).cast('B')
1234 self.assertLess(n, len(bm))
1235 self.assertEqual(bm[:n], data[:n])
1236 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1237
1238 def test_readinto1_array(self):
1239 buffer_size = 60
1240 data = b"a" * 26
1241 rawio = self.MockRawIO((data,))
1242 bufio = self.tp(rawio, buffer_size=buffer_size)
1243
1244 # Create an array with element size > 1 byte
1245 b = array.array('i', b'x' * 32)
1246 assert len(b) != 16
1247
1248 # Read into it. We should get as many *bytes* as we can fit into b
1249 # (which is more than the number of elements)
1250 n = bufio.readinto1(b)
1251 self.assertGreater(n, len(b))
1252
1253 # Check that old contents of b are preserved
1254 bm = memoryview(b).cast('B')
1255 self.assertLess(n, len(bm))
1256 self.assertEqual(bm[:n], data[:n])
1257 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1258
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001259 def test_readlines(self):
1260 def bufio():
1261 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1262 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001263 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1264 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1265 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001266
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001267 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001268 data = b"abcdefghi"
1269 dlen = len(data)
1270
1271 tests = [
1272 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1273 [ 100, [ 3, 3, 3], [ dlen ] ],
1274 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1275 ]
1276
1277 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001278 rawio = self.MockFileIO(data)
1279 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001280 pos = 0
1281 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001282 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001283 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001284 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001285 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001286
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001287 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001288 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001289 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1290 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001291 self.assertEqual(b"abcd", bufio.read(6))
1292 self.assertEqual(b"e", bufio.read(1))
1293 self.assertEqual(b"fg", bufio.read())
1294 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001295 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001296 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001297
Victor Stinnera80987f2011-05-25 22:47:16 +02001298 rawio = self.MockRawIO((b"a", None, None))
1299 self.assertEqual(b"a", rawio.readall())
1300 self.assertIsNone(rawio.readall())
1301
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001302 def test_read_past_eof(self):
1303 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1304 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001305
Ezio Melottib3aedd42010-11-20 19:04:17 +00001306 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001307
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001308 def test_read_all(self):
1309 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1310 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001311
Ezio Melottib3aedd42010-11-20 19:04:17 +00001312 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001313
Victor Stinner45df8202010-04-28 22:31:17 +00001314 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001315 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001316 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001317 try:
1318 # Write out many bytes with exactly the same number of 0's,
1319 # 1's... 255's. This will help us check that concurrent reading
1320 # doesn't duplicate or forget contents.
1321 N = 1000
1322 l = list(range(256)) * N
1323 random.shuffle(l)
1324 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001325 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001326 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001327 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001328 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001329 errors = []
1330 results = []
1331 def f():
1332 try:
1333 # Intra-buffer read then buffer-flushing read
1334 for n in cycle([1, 19]):
1335 s = bufio.read(n)
1336 if not s:
1337 break
1338 # list.append() is atomic
1339 results.append(s)
1340 except Exception as e:
1341 errors.append(e)
1342 raise
1343 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001344 with support.start_threads(threads):
1345 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001346 self.assertFalse(errors,
1347 "the following exceptions were caught: %r" % errors)
1348 s = b''.join(results)
1349 for i in range(256):
1350 c = bytes(bytearray([i]))
1351 self.assertEqual(s.count(c), N)
1352 finally:
1353 support.unlink(support.TESTFN)
1354
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001355 def test_unseekable(self):
1356 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1357 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1358 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1359 bufio.read(1)
1360 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1361 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1362
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001363 def test_misbehaved_io(self):
1364 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1365 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001366 self.assertRaises(OSError, bufio.seek, 0)
1367 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001368
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001369 def test_no_extraneous_read(self):
1370 # Issue #9550; when the raw IO object has satisfied the read request,
1371 # we should not issue any additional reads, otherwise it may block
1372 # (e.g. socket).
1373 bufsize = 16
1374 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1375 rawio = self.MockRawIO([b"x" * n])
1376 bufio = self.tp(rawio, bufsize)
1377 self.assertEqual(bufio.read(n), b"x" * n)
1378 # Simple case: one raw read is enough to satisfy the request.
1379 self.assertEqual(rawio._extraneous_reads, 0,
1380 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1381 # A more complex case where two raw reads are needed to satisfy
1382 # the request.
1383 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1384 bufio = self.tp(rawio, bufsize)
1385 self.assertEqual(bufio.read(n), b"x" * n)
1386 self.assertEqual(rawio._extraneous_reads, 0,
1387 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1388
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001389 def test_read_on_closed(self):
1390 # Issue #23796
1391 b = io.BufferedReader(io.BytesIO(b"12"))
1392 b.read(1)
1393 b.close()
1394 self.assertRaises(ValueError, b.peek)
1395 self.assertRaises(ValueError, b.read1, 1)
1396
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001397
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001398class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001399 tp = io.BufferedReader
1400
1401 def test_constructor(self):
1402 BufferedReaderTest.test_constructor(self)
1403 # The allocation can succeed on 32-bit builds, e.g. with more
1404 # than 2GB RAM and a 64-bit kernel.
1405 if sys.maxsize > 0x7FFFFFFF:
1406 rawio = self.MockRawIO()
1407 bufio = self.tp(rawio)
1408 self.assertRaises((OverflowError, MemoryError, ValueError),
1409 bufio.__init__, rawio, sys.maxsize)
1410
1411 def test_initialization(self):
1412 rawio = self.MockRawIO([b"abc"])
1413 bufio = self.tp(rawio)
1414 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1415 self.assertRaises(ValueError, bufio.read)
1416 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1417 self.assertRaises(ValueError, bufio.read)
1418 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1419 self.assertRaises(ValueError, bufio.read)
1420
1421 def test_misbehaved_io_read(self):
1422 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1423 bufio = self.tp(rawio)
1424 # _pyio.BufferedReader seems to implement reading different, so that
1425 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001426 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001427
1428 def test_garbage_collection(self):
1429 # C BufferedReader objects are collected.
1430 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001431 with support.check_warnings(('', ResourceWarning)):
1432 rawio = self.FileIO(support.TESTFN, "w+b")
1433 f = self.tp(rawio)
1434 f.f = f
1435 wr = weakref.ref(f)
1436 del f
1437 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001438 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001439
R David Murray67bfe802013-02-23 21:51:05 -05001440 def test_args_error(self):
1441 # Issue #17275
1442 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1443 self.tp(io.BytesIO(), 1024, 1024, 1024)
1444
1445
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001446class PyBufferedReaderTest(BufferedReaderTest):
1447 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001448
Guido van Rossuma9e20242007-03-08 00:43:48 +00001449
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001450class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1451 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001452
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001453 def test_constructor(self):
1454 rawio = self.MockRawIO()
1455 bufio = self.tp(rawio)
1456 bufio.__init__(rawio)
1457 bufio.__init__(rawio, buffer_size=1024)
1458 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001459 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001460 bufio.flush()
1461 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1462 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1463 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1464 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001465 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001466 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001467 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001468
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001469 def test_uninitialized(self):
1470 bufio = self.tp.__new__(self.tp)
1471 del bufio
1472 bufio = self.tp.__new__(self.tp)
1473 self.assertRaisesRegex((ValueError, AttributeError),
1474 'uninitialized|has no attribute',
1475 bufio.write, b'')
1476 bufio.__init__(self.MockRawIO())
1477 self.assertEqual(bufio.write(b''), 0)
1478
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001479 def test_detach_flush(self):
1480 raw = self.MockRawIO()
1481 buf = self.tp(raw)
1482 buf.write(b"howdy!")
1483 self.assertFalse(raw._write_stack)
1484 buf.detach()
1485 self.assertEqual(raw._write_stack, [b"howdy!"])
1486
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001487 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001488 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001489 writer = self.MockRawIO()
1490 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001491 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001492 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001493 buffer = bytearray(b"def")
1494 bufio.write(buffer)
1495 buffer[:] = b"***" # Overwrite our copy of the data
1496 bufio.flush()
1497 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001498
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001499 def test_write_overflow(self):
1500 writer = self.MockRawIO()
1501 bufio = self.tp(writer, 8)
1502 contents = b"abcdefghijklmnop"
1503 for n in range(0, len(contents), 3):
1504 bufio.write(contents[n:n+3])
1505 flushed = b"".join(writer._write_stack)
1506 # At least (total - 8) bytes were implicitly flushed, perhaps more
1507 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001508 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001509
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001510 def check_writes(self, intermediate_func):
1511 # Lots of writes, test the flushed output is as expected.
1512 contents = bytes(range(256)) * 1000
1513 n = 0
1514 writer = self.MockRawIO()
1515 bufio = self.tp(writer, 13)
1516 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1517 def gen_sizes():
1518 for size in count(1):
1519 for i in range(15):
1520 yield size
1521 sizes = gen_sizes()
1522 while n < len(contents):
1523 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001524 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001525 intermediate_func(bufio)
1526 n += size
1527 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001528 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001529
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001530 def test_writes(self):
1531 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001532
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001533 def test_writes_and_flushes(self):
1534 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001535
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001536 def test_writes_and_seeks(self):
1537 def _seekabs(bufio):
1538 pos = bufio.tell()
1539 bufio.seek(pos + 1, 0)
1540 bufio.seek(pos - 1, 0)
1541 bufio.seek(pos, 0)
1542 self.check_writes(_seekabs)
1543 def _seekrel(bufio):
1544 pos = bufio.seek(0, 1)
1545 bufio.seek(+1, 1)
1546 bufio.seek(-1, 1)
1547 bufio.seek(pos, 0)
1548 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001549
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001550 def test_writes_and_truncates(self):
1551 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001552
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001553 def test_write_non_blocking(self):
1554 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001555 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001556
Ezio Melottib3aedd42010-11-20 19:04:17 +00001557 self.assertEqual(bufio.write(b"abcd"), 4)
1558 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001559 # 1 byte will be written, the rest will be buffered
1560 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001561 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001562
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001563 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1564 raw.block_on(b"0")
1565 try:
1566 bufio.write(b"opqrwxyz0123456789")
1567 except self.BlockingIOError as e:
1568 written = e.characters_written
1569 else:
1570 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001571 self.assertEqual(written, 16)
1572 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001573 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001574
Ezio Melottib3aedd42010-11-20 19:04:17 +00001575 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001576 s = raw.pop_written()
1577 # Previously buffered bytes were flushed
1578 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001579
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001580 def test_write_and_rewind(self):
1581 raw = io.BytesIO()
1582 bufio = self.tp(raw, 4)
1583 self.assertEqual(bufio.write(b"abcdef"), 6)
1584 self.assertEqual(bufio.tell(), 6)
1585 bufio.seek(0, 0)
1586 self.assertEqual(bufio.write(b"XY"), 2)
1587 bufio.seek(6, 0)
1588 self.assertEqual(raw.getvalue(), b"XYcdef")
1589 self.assertEqual(bufio.write(b"123456"), 6)
1590 bufio.flush()
1591 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001592
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001593 def test_flush(self):
1594 writer = self.MockRawIO()
1595 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001596 bufio.write(b"abc")
1597 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001598 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001599
Antoine Pitrou131a4892012-10-16 22:57:11 +02001600 def test_writelines(self):
1601 l = [b'ab', b'cd', b'ef']
1602 writer = self.MockRawIO()
1603 bufio = self.tp(writer, 8)
1604 bufio.writelines(l)
1605 bufio.flush()
1606 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1607
1608 def test_writelines_userlist(self):
1609 l = UserList([b'ab', b'cd', b'ef'])
1610 writer = self.MockRawIO()
1611 bufio = self.tp(writer, 8)
1612 bufio.writelines(l)
1613 bufio.flush()
1614 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1615
1616 def test_writelines_error(self):
1617 writer = self.MockRawIO()
1618 bufio = self.tp(writer, 8)
1619 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1620 self.assertRaises(TypeError, bufio.writelines, None)
1621 self.assertRaises(TypeError, bufio.writelines, 'abc')
1622
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001623 def test_destructor(self):
1624 writer = self.MockRawIO()
1625 bufio = self.tp(writer, 8)
1626 bufio.write(b"abc")
1627 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001628 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001629 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001630
1631 def test_truncate(self):
1632 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001633 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001634 bufio = self.tp(raw, 8)
1635 bufio.write(b"abcdef")
1636 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001637 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001638 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001639 self.assertEqual(f.read(), b"abc")
1640
Victor Stinner45df8202010-04-28 22:31:17 +00001641 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001642 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001643 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001644 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001645 # Write out many bytes from many threads and test they were
1646 # all flushed.
1647 N = 1000
1648 contents = bytes(range(256)) * N
1649 sizes = cycle([1, 19])
1650 n = 0
1651 queue = deque()
1652 while n < len(contents):
1653 size = next(sizes)
1654 queue.append(contents[n:n+size])
1655 n += size
1656 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001657 # We use a real file object because it allows us to
1658 # exercise situations where the GIL is released before
1659 # writing the buffer to the raw streams. This is in addition
1660 # to concurrency issues due to switching threads in the middle
1661 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001662 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001663 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001664 errors = []
1665 def f():
1666 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001667 while True:
1668 try:
1669 s = queue.popleft()
1670 except IndexError:
1671 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001672 bufio.write(s)
1673 except Exception as e:
1674 errors.append(e)
1675 raise
1676 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001677 with support.start_threads(threads):
1678 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001679 self.assertFalse(errors,
1680 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001681 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001682 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001683 s = f.read()
1684 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001685 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001686 finally:
1687 support.unlink(support.TESTFN)
1688
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001689 def test_misbehaved_io(self):
1690 rawio = self.MisbehavedRawIO()
1691 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001692 self.assertRaises(OSError, bufio.seek, 0)
1693 self.assertRaises(OSError, bufio.tell)
1694 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001695
Florent Xicluna109d5732012-07-07 17:03:22 +02001696 def test_max_buffer_size_removal(self):
1697 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001698 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001699
Benjamin Peterson68623612012-12-20 11:53:11 -06001700 def test_write_error_on_close(self):
1701 raw = self.MockRawIO()
1702 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001703 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001704 raw.write = bad_write
1705 b = self.tp(raw)
1706 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001707 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001708 self.assertTrue(b.closed)
1709
Benjamin Peterson59406a92009-03-26 17:10:29 +00001710
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001711class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001712 tp = io.BufferedWriter
1713
1714 def test_constructor(self):
1715 BufferedWriterTest.test_constructor(self)
1716 # The allocation can succeed on 32-bit builds, e.g. with more
1717 # than 2GB RAM and a 64-bit kernel.
1718 if sys.maxsize > 0x7FFFFFFF:
1719 rawio = self.MockRawIO()
1720 bufio = self.tp(rawio)
1721 self.assertRaises((OverflowError, MemoryError, ValueError),
1722 bufio.__init__, rawio, sys.maxsize)
1723
1724 def test_initialization(self):
1725 rawio = self.MockRawIO()
1726 bufio = self.tp(rawio)
1727 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1728 self.assertRaises(ValueError, bufio.write, b"def")
1729 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1730 self.assertRaises(ValueError, bufio.write, b"def")
1731 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1732 self.assertRaises(ValueError, bufio.write, b"def")
1733
1734 def test_garbage_collection(self):
1735 # C BufferedWriter objects are collected, and collecting them flushes
1736 # all data to disk.
1737 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001738 with support.check_warnings(('', ResourceWarning)):
1739 rawio = self.FileIO(support.TESTFN, "w+b")
1740 f = self.tp(rawio)
1741 f.write(b"123xxx")
1742 f.x = f
1743 wr = weakref.ref(f)
1744 del f
1745 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001746 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001747 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001748 self.assertEqual(f.read(), b"123xxx")
1749
R David Murray67bfe802013-02-23 21:51:05 -05001750 def test_args_error(self):
1751 # Issue #17275
1752 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1753 self.tp(io.BytesIO(), 1024, 1024, 1024)
1754
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001755
1756class PyBufferedWriterTest(BufferedWriterTest):
1757 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001758
Guido van Rossum01a27522007-03-07 01:00:12 +00001759class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001760
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001761 def test_constructor(self):
1762 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001763 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001764
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001765 def test_uninitialized(self):
1766 pair = self.tp.__new__(self.tp)
1767 del pair
1768 pair = self.tp.__new__(self.tp)
1769 self.assertRaisesRegex((ValueError, AttributeError),
1770 'uninitialized|has no attribute',
1771 pair.read, 0)
1772 self.assertRaisesRegex((ValueError, AttributeError),
1773 'uninitialized|has no attribute',
1774 pair.write, b'')
1775 pair.__init__(self.MockRawIO(), self.MockRawIO())
1776 self.assertEqual(pair.read(0), b'')
1777 self.assertEqual(pair.write(b''), 0)
1778
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001779 def test_detach(self):
1780 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1781 self.assertRaises(self.UnsupportedOperation, pair.detach)
1782
Florent Xicluna109d5732012-07-07 17:03:22 +02001783 def test_constructor_max_buffer_size_removal(self):
1784 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001785 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001786
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001787 def test_constructor_with_not_readable(self):
1788 class NotReadable(MockRawIO):
1789 def readable(self):
1790 return False
1791
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001792 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001793
1794 def test_constructor_with_not_writeable(self):
1795 class NotWriteable(MockRawIO):
1796 def writable(self):
1797 return False
1798
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001799 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001800
1801 def test_read(self):
1802 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1803
1804 self.assertEqual(pair.read(3), b"abc")
1805 self.assertEqual(pair.read(1), b"d")
1806 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001807 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1808 self.assertEqual(pair.read(None), b"abc")
1809
1810 def test_readlines(self):
1811 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1812 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1813 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1814 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001815
1816 def test_read1(self):
1817 # .read1() is delegated to the underlying reader object, so this test
1818 # can be shallow.
1819 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1820
1821 self.assertEqual(pair.read1(3), b"abc")
1822
1823 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00001824 for method in ("readinto", "readinto1"):
1825 with self.subTest(method):
1826 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001827
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03001828 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00001829 self.assertEqual(getattr(pair, method)(data), 5)
1830 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001831
1832 def test_write(self):
1833 w = self.MockRawIO()
1834 pair = self.tp(self.MockRawIO(), w)
1835
1836 pair.write(b"abc")
1837 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00001838 buffer = bytearray(b"def")
1839 pair.write(buffer)
1840 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001841 pair.flush()
1842 self.assertEqual(w._write_stack, [b"abc", b"def"])
1843
1844 def test_peek(self):
1845 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1846
1847 self.assertTrue(pair.peek(3).startswith(b"abc"))
1848 self.assertEqual(pair.read(3), b"abc")
1849
1850 def test_readable(self):
1851 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1852 self.assertTrue(pair.readable())
1853
1854 def test_writeable(self):
1855 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1856 self.assertTrue(pair.writable())
1857
1858 def test_seekable(self):
1859 # BufferedRWPairs are never seekable, even if their readers and writers
1860 # are.
1861 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1862 self.assertFalse(pair.seekable())
1863
1864 # .flush() is delegated to the underlying writer object and has been
1865 # tested in the test_write method.
1866
1867 def test_close_and_closed(self):
1868 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1869 self.assertFalse(pair.closed)
1870 pair.close()
1871 self.assertTrue(pair.closed)
1872
Serhiy Storchaka7665be62015-03-24 23:21:57 +02001873 def test_reader_close_error_on_close(self):
1874 def reader_close():
1875 reader_non_existing
1876 reader = self.MockRawIO()
1877 reader.close = reader_close
1878 writer = self.MockRawIO()
1879 pair = self.tp(reader, writer)
1880 with self.assertRaises(NameError) as err:
1881 pair.close()
1882 self.assertIn('reader_non_existing', str(err.exception))
1883 self.assertTrue(pair.closed)
1884 self.assertFalse(reader.closed)
1885 self.assertTrue(writer.closed)
1886
1887 def test_writer_close_error_on_close(self):
1888 def writer_close():
1889 writer_non_existing
1890 reader = self.MockRawIO()
1891 writer = self.MockRawIO()
1892 writer.close = writer_close
1893 pair = self.tp(reader, writer)
1894 with self.assertRaises(NameError) as err:
1895 pair.close()
1896 self.assertIn('writer_non_existing', str(err.exception))
1897 self.assertFalse(pair.closed)
1898 self.assertTrue(reader.closed)
1899 self.assertFalse(writer.closed)
1900
1901 def test_reader_writer_close_error_on_close(self):
1902 def reader_close():
1903 reader_non_existing
1904 def writer_close():
1905 writer_non_existing
1906 reader = self.MockRawIO()
1907 reader.close = reader_close
1908 writer = self.MockRawIO()
1909 writer.close = writer_close
1910 pair = self.tp(reader, writer)
1911 with self.assertRaises(NameError) as err:
1912 pair.close()
1913 self.assertIn('reader_non_existing', str(err.exception))
1914 self.assertIsInstance(err.exception.__context__, NameError)
1915 self.assertIn('writer_non_existing', str(err.exception.__context__))
1916 self.assertFalse(pair.closed)
1917 self.assertFalse(reader.closed)
1918 self.assertFalse(writer.closed)
1919
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001920 def test_isatty(self):
1921 class SelectableIsAtty(MockRawIO):
1922 def __init__(self, isatty):
1923 MockRawIO.__init__(self)
1924 self._isatty = isatty
1925
1926 def isatty(self):
1927 return self._isatty
1928
1929 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1930 self.assertFalse(pair.isatty())
1931
1932 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1933 self.assertTrue(pair.isatty())
1934
1935 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1936 self.assertTrue(pair.isatty())
1937
1938 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1939 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001940
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04001941 def test_weakref_clearing(self):
1942 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1943 ref = weakref.ref(brw)
1944 brw = None
1945 ref = None # Shouldn't segfault.
1946
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001947class CBufferedRWPairTest(BufferedRWPairTest):
1948 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001949
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001950class PyBufferedRWPairTest(BufferedRWPairTest):
1951 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001952
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001953
1954class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1955 read_mode = "rb+"
1956 write_mode = "wb+"
1957
1958 def test_constructor(self):
1959 BufferedReaderTest.test_constructor(self)
1960 BufferedWriterTest.test_constructor(self)
1961
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001962 def test_uninitialized(self):
1963 BufferedReaderTest.test_uninitialized(self)
1964 BufferedWriterTest.test_uninitialized(self)
1965
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001966 def test_read_and_write(self):
1967 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001968 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001969
1970 self.assertEqual(b"as", rw.read(2))
1971 rw.write(b"ddd")
1972 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001973 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001974 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001975 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001976
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001977 def test_seek_and_tell(self):
1978 raw = self.BytesIO(b"asdfghjkl")
1979 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001980
Ezio Melottib3aedd42010-11-20 19:04:17 +00001981 self.assertEqual(b"as", rw.read(2))
1982 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001983 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001984 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001985
Antoine Pitroue05565e2011-08-20 14:39:23 +02001986 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001987 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001988 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001989 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001990 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001991 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001992 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001993 self.assertEqual(7, rw.tell())
1994 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001995 rw.flush()
1996 self.assertEqual(b"asdf123fl", raw.getvalue())
1997
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001998 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001999
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002000 def check_flush_and_read(self, read_func):
2001 raw = self.BytesIO(b"abcdefghi")
2002 bufio = self.tp(raw)
2003
Ezio Melottib3aedd42010-11-20 19:04:17 +00002004 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002005 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002006 self.assertEqual(b"ef", read_func(bufio, 2))
2007 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002008 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002009 self.assertEqual(6, bufio.tell())
2010 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002011 raw.seek(0, 0)
2012 raw.write(b"XYZ")
2013 # flush() resets the read buffer
2014 bufio.flush()
2015 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002016 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002017
2018 def test_flush_and_read(self):
2019 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2020
2021 def test_flush_and_readinto(self):
2022 def _readinto(bufio, n=-1):
2023 b = bytearray(n if n >= 0 else 9999)
2024 n = bufio.readinto(b)
2025 return bytes(b[:n])
2026 self.check_flush_and_read(_readinto)
2027
2028 def test_flush_and_peek(self):
2029 def _peek(bufio, n=-1):
2030 # This relies on the fact that the buffer can contain the whole
2031 # raw stream, otherwise peek() can return less.
2032 b = bufio.peek(n)
2033 if n != -1:
2034 b = b[:n]
2035 bufio.seek(len(b), 1)
2036 return b
2037 self.check_flush_and_read(_peek)
2038
2039 def test_flush_and_write(self):
2040 raw = self.BytesIO(b"abcdefghi")
2041 bufio = self.tp(raw)
2042
2043 bufio.write(b"123")
2044 bufio.flush()
2045 bufio.write(b"45")
2046 bufio.flush()
2047 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002048 self.assertEqual(b"12345fghi", raw.getvalue())
2049 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002050
2051 def test_threads(self):
2052 BufferedReaderTest.test_threads(self)
2053 BufferedWriterTest.test_threads(self)
2054
2055 def test_writes_and_peek(self):
2056 def _peek(bufio):
2057 bufio.peek(1)
2058 self.check_writes(_peek)
2059 def _peek(bufio):
2060 pos = bufio.tell()
2061 bufio.seek(-1, 1)
2062 bufio.peek(1)
2063 bufio.seek(pos, 0)
2064 self.check_writes(_peek)
2065
2066 def test_writes_and_reads(self):
2067 def _read(bufio):
2068 bufio.seek(-1, 1)
2069 bufio.read(1)
2070 self.check_writes(_read)
2071
2072 def test_writes_and_read1s(self):
2073 def _read1(bufio):
2074 bufio.seek(-1, 1)
2075 bufio.read1(1)
2076 self.check_writes(_read1)
2077
2078 def test_writes_and_readintos(self):
2079 def _read(bufio):
2080 bufio.seek(-1, 1)
2081 bufio.readinto(bytearray(1))
2082 self.check_writes(_read)
2083
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002084 def test_write_after_readahead(self):
2085 # Issue #6629: writing after the buffer was filled by readahead should
2086 # first rewind the raw stream.
2087 for overwrite_size in [1, 5]:
2088 raw = self.BytesIO(b"A" * 10)
2089 bufio = self.tp(raw, 4)
2090 # Trigger readahead
2091 self.assertEqual(bufio.read(1), b"A")
2092 self.assertEqual(bufio.tell(), 1)
2093 # Overwriting should rewind the raw stream if it needs so
2094 bufio.write(b"B" * overwrite_size)
2095 self.assertEqual(bufio.tell(), overwrite_size + 1)
2096 # If the write size was smaller than the buffer size, flush() and
2097 # check that rewind happens.
2098 bufio.flush()
2099 self.assertEqual(bufio.tell(), overwrite_size + 1)
2100 s = raw.getvalue()
2101 self.assertEqual(s,
2102 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2103
Antoine Pitrou7c404892011-05-13 00:13:33 +02002104 def test_write_rewind_write(self):
2105 # Various combinations of reading / writing / seeking backwards / writing again
2106 def mutate(bufio, pos1, pos2):
2107 assert pos2 >= pos1
2108 # Fill the buffer
2109 bufio.seek(pos1)
2110 bufio.read(pos2 - pos1)
2111 bufio.write(b'\x02')
2112 # This writes earlier than the previous write, but still inside
2113 # the buffer.
2114 bufio.seek(pos1)
2115 bufio.write(b'\x01')
2116
2117 b = b"\x80\x81\x82\x83\x84"
2118 for i in range(0, len(b)):
2119 for j in range(i, len(b)):
2120 raw = self.BytesIO(b)
2121 bufio = self.tp(raw, 100)
2122 mutate(bufio, i, j)
2123 bufio.flush()
2124 expected = bytearray(b)
2125 expected[j] = 2
2126 expected[i] = 1
2127 self.assertEqual(raw.getvalue(), expected,
2128 "failed result for i=%d, j=%d" % (i, j))
2129
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002130 def test_truncate_after_read_or_write(self):
2131 raw = self.BytesIO(b"A" * 10)
2132 bufio = self.tp(raw, 100)
2133 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2134 self.assertEqual(bufio.truncate(), 2)
2135 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2136 self.assertEqual(bufio.truncate(), 4)
2137
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002138 def test_misbehaved_io(self):
2139 BufferedReaderTest.test_misbehaved_io(self)
2140 BufferedWriterTest.test_misbehaved_io(self)
2141
Antoine Pitroue05565e2011-08-20 14:39:23 +02002142 def test_interleaved_read_write(self):
2143 # Test for issue #12213
2144 with self.BytesIO(b'abcdefgh') as raw:
2145 with self.tp(raw, 100) as f:
2146 f.write(b"1")
2147 self.assertEqual(f.read(1), b'b')
2148 f.write(b'2')
2149 self.assertEqual(f.read1(1), b'd')
2150 f.write(b'3')
2151 buf = bytearray(1)
2152 f.readinto(buf)
2153 self.assertEqual(buf, b'f')
2154 f.write(b'4')
2155 self.assertEqual(f.peek(1), b'h')
2156 f.flush()
2157 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2158
2159 with self.BytesIO(b'abc') as raw:
2160 with self.tp(raw, 100) as f:
2161 self.assertEqual(f.read(1), b'a')
2162 f.write(b"2")
2163 self.assertEqual(f.read(1), b'c')
2164 f.flush()
2165 self.assertEqual(raw.getvalue(), b'a2c')
2166
2167 def test_interleaved_readline_write(self):
2168 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2169 with self.tp(raw) as f:
2170 f.write(b'1')
2171 self.assertEqual(f.readline(), b'b\n')
2172 f.write(b'2')
2173 self.assertEqual(f.readline(), b'def\n')
2174 f.write(b'3')
2175 self.assertEqual(f.readline(), b'\n')
2176 f.flush()
2177 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2178
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002179 # You can't construct a BufferedRandom over a non-seekable stream.
2180 test_unseekable = None
2181
R David Murray67bfe802013-02-23 21:51:05 -05002182
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002183class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002184 tp = io.BufferedRandom
2185
2186 def test_constructor(self):
2187 BufferedRandomTest.test_constructor(self)
2188 # The allocation can succeed on 32-bit builds, e.g. with more
2189 # than 2GB RAM and a 64-bit kernel.
2190 if sys.maxsize > 0x7FFFFFFF:
2191 rawio = self.MockRawIO()
2192 bufio = self.tp(rawio)
2193 self.assertRaises((OverflowError, MemoryError, ValueError),
2194 bufio.__init__, rawio, sys.maxsize)
2195
2196 def test_garbage_collection(self):
2197 CBufferedReaderTest.test_garbage_collection(self)
2198 CBufferedWriterTest.test_garbage_collection(self)
2199
R David Murray67bfe802013-02-23 21:51:05 -05002200 def test_args_error(self):
2201 # Issue #17275
2202 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2203 self.tp(io.BytesIO(), 1024, 1024, 1024)
2204
2205
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002206class PyBufferedRandomTest(BufferedRandomTest):
2207 tp = pyio.BufferedRandom
2208
2209
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002210# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2211# properties:
2212# - A single output character can correspond to many bytes of input.
2213# - The number of input bytes to complete the character can be
2214# undetermined until the last input byte is received.
2215# - The number of input bytes can vary depending on previous input.
2216# - A single input byte can correspond to many characters of output.
2217# - The number of output characters can be undetermined until the
2218# last input byte is received.
2219# - The number of output characters can vary depending on previous input.
2220
2221class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2222 """
2223 For testing seek/tell behavior with a stateful, buffering decoder.
2224
2225 Input is a sequence of words. Words may be fixed-length (length set
2226 by input) or variable-length (period-terminated). In variable-length
2227 mode, extra periods are ignored. Possible words are:
2228 - 'i' followed by a number sets the input length, I (maximum 99).
2229 When I is set to 0, words are space-terminated.
2230 - 'o' followed by a number sets the output length, O (maximum 99).
2231 - Any other word is converted into a word followed by a period on
2232 the output. The output word consists of the input word truncated
2233 or padded out with hyphens to make its length equal to O. If O
2234 is 0, the word is output verbatim without truncating or padding.
2235 I and O are initially set to 1. When I changes, any buffered input is
2236 re-scanned according to the new I. EOF also terminates the last word.
2237 """
2238
2239 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002240 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002241 self.reset()
2242
2243 def __repr__(self):
2244 return '<SID %x>' % id(self)
2245
2246 def reset(self):
2247 self.i = 1
2248 self.o = 1
2249 self.buffer = bytearray()
2250
2251 def getstate(self):
2252 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2253 return bytes(self.buffer), i*100 + o
2254
2255 def setstate(self, state):
2256 buffer, io = state
2257 self.buffer = bytearray(buffer)
2258 i, o = divmod(io, 100)
2259 self.i, self.o = i ^ 1, o ^ 1
2260
2261 def decode(self, input, final=False):
2262 output = ''
2263 for b in input:
2264 if self.i == 0: # variable-length, terminated with period
2265 if b == ord('.'):
2266 if self.buffer:
2267 output += self.process_word()
2268 else:
2269 self.buffer.append(b)
2270 else: # fixed-length, terminate after self.i bytes
2271 self.buffer.append(b)
2272 if len(self.buffer) == self.i:
2273 output += self.process_word()
2274 if final and self.buffer: # EOF terminates the last word
2275 output += self.process_word()
2276 return output
2277
2278 def process_word(self):
2279 output = ''
2280 if self.buffer[0] == ord('i'):
2281 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2282 elif self.buffer[0] == ord('o'):
2283 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2284 else:
2285 output = self.buffer.decode('ascii')
2286 if len(output) < self.o:
2287 output += '-'*self.o # pad out with hyphens
2288 if self.o:
2289 output = output[:self.o] # truncate to output length
2290 output += '.'
2291 self.buffer = bytearray()
2292 return output
2293
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002294 codecEnabled = False
2295
2296 @classmethod
2297 def lookupTestDecoder(cls, name):
2298 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002299 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002300 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002301 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002302 incrementalencoder=None,
2303 streamreader=None, streamwriter=None,
2304 incrementaldecoder=cls)
2305
2306# Register the previous decoder for testing.
2307# Disabled by default, tests will enable it.
2308codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2309
2310
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002311class StatefulIncrementalDecoderTest(unittest.TestCase):
2312 """
2313 Make sure the StatefulIncrementalDecoder actually works.
2314 """
2315
2316 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002317 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002318 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002319 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002320 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002321 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002322 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002323 # I=0, O=6 (variable-length input, fixed-length output)
2324 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2325 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002326 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002327 # I=6, O=3 (fixed-length input > fixed-length output)
2328 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2329 # I=0, then 3; O=29, then 15 (with longer output)
2330 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2331 'a----------------------------.' +
2332 'b----------------------------.' +
2333 'cde--------------------------.' +
2334 'abcdefghijabcde.' +
2335 'a.b------------.' +
2336 '.c.------------.' +
2337 'd.e------------.' +
2338 'k--------------.' +
2339 'l--------------.' +
2340 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002341 ]
2342
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002343 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002344 # Try a few one-shot test cases.
2345 for input, eof, output in self.test_cases:
2346 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002347 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002348
2349 # Also test an unfinished decode, followed by forcing EOF.
2350 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002351 self.assertEqual(d.decode(b'oiabcd'), '')
2352 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002353
2354class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002355
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002356 def setUp(self):
2357 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2358 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002359 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002360
Guido van Rossumd0712812007-04-11 16:32:43 +00002361 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002362 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002363
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002364 def test_constructor(self):
2365 r = self.BytesIO(b"\xc3\xa9\n\n")
2366 b = self.BufferedReader(r, 1000)
2367 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002368 t.__init__(b, encoding="latin-1", newline="\r\n")
2369 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002370 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002371 t.__init__(b, encoding="utf-8", line_buffering=True)
2372 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002373 self.assertEqual(t.line_buffering, True)
2374 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002375 self.assertRaises(TypeError, t.__init__, b, newline=42)
2376 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2377
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002378 def test_uninitialized(self):
2379 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2380 del t
2381 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2382 self.assertRaises(Exception, repr, t)
2383 self.assertRaisesRegex((ValueError, AttributeError),
2384 'uninitialized|has no attribute',
2385 t.read, 0)
2386 t.__init__(self.MockRawIO())
2387 self.assertEqual(t.read(0), '')
2388
Nick Coghlana9b15242014-02-04 22:11:18 +10002389 def test_non_text_encoding_codecs_are_rejected(self):
2390 # Ensure the constructor complains if passed a codec that isn't
2391 # marked as a text encoding
2392 # http://bugs.python.org/issue20404
2393 r = self.BytesIO()
2394 b = self.BufferedWriter(r)
2395 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2396 self.TextIOWrapper(b, encoding="hex")
2397
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002398 def test_detach(self):
2399 r = self.BytesIO()
2400 b = self.BufferedWriter(r)
2401 t = self.TextIOWrapper(b)
2402 self.assertIs(t.detach(), b)
2403
2404 t = self.TextIOWrapper(b, encoding="ascii")
2405 t.write("howdy")
2406 self.assertFalse(r.getvalue())
2407 t.detach()
2408 self.assertEqual(r.getvalue(), b"howdy")
2409 self.assertRaises(ValueError, t.detach)
2410
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002411 # Operations independent of the detached stream should still work
2412 repr(t)
2413 self.assertEqual(t.encoding, "ascii")
2414 self.assertEqual(t.errors, "strict")
2415 self.assertFalse(t.line_buffering)
2416
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002417 def test_repr(self):
2418 raw = self.BytesIO("hello".encode("utf-8"))
2419 b = self.BufferedReader(raw)
2420 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002421 modname = self.TextIOWrapper.__module__
2422 self.assertEqual(repr(t),
2423 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2424 raw.name = "dummy"
2425 self.assertEqual(repr(t),
2426 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002427 t.mode = "r"
2428 self.assertEqual(repr(t),
2429 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002430 raw.name = b"dummy"
2431 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002432 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002433
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002434 t.buffer.detach()
2435 repr(t) # Should not raise an exception
2436
Serhiy Storchakafca705d2017-03-19 20:27:16 +02002437 def test_recursive_repr(self):
2438 # Issue #25455
2439 raw = self.BytesIO()
2440 t = self.TextIOWrapper(raw)
2441 with support.swap_attr(raw, 'name', t):
2442 try:
2443 repr(t) # Should not crash
2444 except RuntimeError:
2445 pass
2446
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002447 def test_line_buffering(self):
2448 r = self.BytesIO()
2449 b = self.BufferedWriter(r, 1000)
2450 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002451 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002452 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002453 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002454 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002455 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002456 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002457
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002458 def test_default_encoding(self):
2459 old_environ = dict(os.environ)
2460 try:
2461 # try to get a user preferred encoding different than the current
2462 # locale encoding to check that TextIOWrapper() uses the current
2463 # locale encoding and not the user preferred encoding
2464 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2465 if key in os.environ:
2466 del os.environ[key]
2467
2468 current_locale_encoding = locale.getpreferredencoding(False)
2469 b = self.BytesIO()
2470 t = self.TextIOWrapper(b)
2471 self.assertEqual(t.encoding, current_locale_encoding)
2472 finally:
2473 os.environ.clear()
2474 os.environ.update(old_environ)
2475
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002476 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002477 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002478 # Issue 15989
2479 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002480 b = self.BytesIO()
2481 b.fileno = lambda: _testcapi.INT_MAX + 1
2482 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2483 b.fileno = lambda: _testcapi.UINT_MAX + 1
2484 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2485
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002486 def test_encoding(self):
2487 # Check the encoding attribute is always set, and valid
2488 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002489 t = self.TextIOWrapper(b, encoding="utf-8")
2490 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002491 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002492 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002493 codecs.lookup(t.encoding)
2494
2495 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002496 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002497 b = self.BytesIO(b"abc\n\xff\n")
2498 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002499 self.assertRaises(UnicodeError, t.read)
2500 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002501 b = self.BytesIO(b"abc\n\xff\n")
2502 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002503 self.assertRaises(UnicodeError, t.read)
2504 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002505 b = self.BytesIO(b"abc\n\xff\n")
2506 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002507 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002508 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002509 b = self.BytesIO(b"abc\n\xff\n")
2510 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002511 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002512
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002513 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002514 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002515 b = self.BytesIO()
2516 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002517 self.assertRaises(UnicodeError, t.write, "\xff")
2518 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002519 b = self.BytesIO()
2520 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002521 self.assertRaises(UnicodeError, t.write, "\xff")
2522 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002523 b = self.BytesIO()
2524 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002525 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002526 t.write("abc\xffdef\n")
2527 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002528 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002529 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002530 b = self.BytesIO()
2531 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002532 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002533 t.write("abc\xffdef\n")
2534 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002535 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002536
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002537 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002538 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2539
2540 tests = [
2541 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002542 [ '', input_lines ],
2543 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2544 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2545 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002546 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002547 encodings = (
2548 'utf-8', 'latin-1',
2549 'utf-16', 'utf-16-le', 'utf-16-be',
2550 'utf-32', 'utf-32-le', 'utf-32-be',
2551 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002552
Guido van Rossum8358db22007-08-18 21:39:55 +00002553 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002554 # character in TextIOWrapper._pending_line.
2555 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002556 # XXX: str.encode() should return bytes
2557 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002558 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002559 for bufsize in range(1, 10):
2560 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002561 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2562 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002563 encoding=encoding)
2564 if do_reads:
2565 got_lines = []
2566 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002567 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002568 if c2 == '':
2569 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002570 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002571 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002572 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002573 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002574
2575 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002576 self.assertEqual(got_line, exp_line)
2577 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002578
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002579 def test_newlines_input(self):
2580 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002581 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2582 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002583 (None, normalized.decode("ascii").splitlines(keepends=True)),
2584 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002585 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2586 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2587 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002588 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002589 buf = self.BytesIO(testdata)
2590 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002591 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002592 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002593 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002594
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002595 def test_newlines_output(self):
2596 testdict = {
2597 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2598 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2599 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2600 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2601 }
2602 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2603 for newline, expected in tests:
2604 buf = self.BytesIO()
2605 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2606 txt.write("AAA\nB")
2607 txt.write("BB\nCCC\n")
2608 txt.write("X\rY\r\nZ")
2609 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002610 self.assertEqual(buf.closed, False)
2611 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002612
2613 def test_destructor(self):
2614 l = []
2615 base = self.BytesIO
2616 class MyBytesIO(base):
2617 def close(self):
2618 l.append(self.getvalue())
2619 base.close(self)
2620 b = MyBytesIO()
2621 t = self.TextIOWrapper(b, encoding="ascii")
2622 t.write("abc")
2623 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002624 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002625 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002626
2627 def test_override_destructor(self):
2628 record = []
2629 class MyTextIO(self.TextIOWrapper):
2630 def __del__(self):
2631 record.append(1)
2632 try:
2633 f = super().__del__
2634 except AttributeError:
2635 pass
2636 else:
2637 f()
2638 def close(self):
2639 record.append(2)
2640 super().close()
2641 def flush(self):
2642 record.append(3)
2643 super().flush()
2644 b = self.BytesIO()
2645 t = MyTextIO(b, encoding="ascii")
2646 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002647 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002648 self.assertEqual(record, [1, 2, 3])
2649
2650 def test_error_through_destructor(self):
2651 # Test that the exception state is not modified by a destructor,
2652 # even if close() fails.
2653 rawio = self.CloseFailureIO()
2654 def f():
2655 self.TextIOWrapper(rawio).xyzzy
2656 with support.captured_output("stderr") as s:
2657 self.assertRaises(AttributeError, f)
2658 s = s.getvalue().strip()
2659 if s:
2660 # The destructor *may* have printed an unraisable error, check it
2661 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002662 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002663 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002664
Guido van Rossum9b76da62007-04-11 01:09:03 +00002665 # Systematic tests of the text I/O API
2666
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002667 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002668 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002669 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002670 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002671 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002672 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002673 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002674 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002675 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002676 self.assertEqual(f.tell(), 0)
2677 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002678 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002679 self.assertEqual(f.seek(0), 0)
2680 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002681 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002682 self.assertEqual(f.read(2), "ab")
2683 self.assertEqual(f.read(1), "c")
2684 self.assertEqual(f.read(1), "")
2685 self.assertEqual(f.read(), "")
2686 self.assertEqual(f.tell(), cookie)
2687 self.assertEqual(f.seek(0), 0)
2688 self.assertEqual(f.seek(0, 2), cookie)
2689 self.assertEqual(f.write("def"), 3)
2690 self.assertEqual(f.seek(cookie), cookie)
2691 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002692 if enc.startswith("utf"):
2693 self.multi_line_test(f, enc)
2694 f.close()
2695
2696 def multi_line_test(self, f, enc):
2697 f.seek(0)
2698 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002699 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002700 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002701 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002702 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002703 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002704 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002705 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002706 wlines.append((f.tell(), line))
2707 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002708 f.seek(0)
2709 rlines = []
2710 while True:
2711 pos = f.tell()
2712 line = f.readline()
2713 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002714 break
2715 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002716 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002717
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002718 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002719 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002720 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002721 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002722 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002723 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002724 p2 = f.tell()
2725 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002726 self.assertEqual(f.tell(), p0)
2727 self.assertEqual(f.readline(), "\xff\n")
2728 self.assertEqual(f.tell(), p1)
2729 self.assertEqual(f.readline(), "\xff\n")
2730 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002731 f.seek(0)
2732 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002733 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002734 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002735 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002736 f.close()
2737
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002738 def test_seeking(self):
2739 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002740 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002741 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002742 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002743 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002744 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002745 suffix = bytes(u_suffix.encode("utf-8"))
2746 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002747 with self.open(support.TESTFN, "wb") as f:
2748 f.write(line*2)
2749 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2750 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002751 self.assertEqual(s, str(prefix, "ascii"))
2752 self.assertEqual(f.tell(), prefix_size)
2753 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002754
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002755 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002756 # Regression test for a specific bug
2757 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002758 with self.open(support.TESTFN, "wb") as f:
2759 f.write(data)
2760 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2761 f._CHUNK_SIZE # Just test that it exists
2762 f._CHUNK_SIZE = 2
2763 f.readline()
2764 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002765
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002766 def test_seek_and_tell(self):
2767 #Test seek/tell using the StatefulIncrementalDecoder.
2768 # Make test faster by doing smaller seeks
2769 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002770
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002771 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002772 """Tell/seek to various points within a data stream and ensure
2773 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002774 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002775 f.write(data)
2776 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002777 f = self.open(support.TESTFN, encoding='test_decoder')
2778 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002779 decoded = f.read()
2780 f.close()
2781
Neal Norwitze2b07052008-03-18 19:52:05 +00002782 for i in range(min_pos, len(decoded) + 1): # seek positions
2783 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002784 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002785 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002786 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002787 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002788 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002789 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002790 f.close()
2791
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002792 # Enable the test decoder.
2793 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002794
2795 # Run the tests.
2796 try:
2797 # Try each test case.
2798 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002799 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002800
2801 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002802 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2803 offset = CHUNK_SIZE - len(input)//2
2804 prefix = b'.'*offset
2805 # Don't bother seeking into the prefix (takes too long).
2806 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002807 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002808
2809 # Ensure our test decoder won't interfere with subsequent tests.
2810 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002811 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002812
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002813 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002814 data = "1234567890"
2815 tests = ("utf-16",
2816 "utf-16-le",
2817 "utf-16-be",
2818 "utf-32",
2819 "utf-32-le",
2820 "utf-32-be")
2821 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002822 buf = self.BytesIO()
2823 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002824 # Check if the BOM is written only once (see issue1753).
2825 f.write(data)
2826 f.write(data)
2827 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002828 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002829 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002830 self.assertEqual(f.read(), data * 2)
2831 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002832
Benjamin Petersona1b49012009-03-31 23:11:32 +00002833 def test_unreadable(self):
2834 class UnReadable(self.BytesIO):
2835 def readable(self):
2836 return False
2837 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002838 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002839
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002840 def test_read_one_by_one(self):
2841 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002842 reads = ""
2843 while True:
2844 c = txt.read(1)
2845 if not c:
2846 break
2847 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002848 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002849
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002850 def test_readlines(self):
2851 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2852 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2853 txt.seek(0)
2854 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2855 txt.seek(0)
2856 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2857
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002858 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002859 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002860 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002861 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002862 reads = ""
2863 while True:
2864 c = txt.read(128)
2865 if not c:
2866 break
2867 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002868 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002869
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002870 def test_writelines(self):
2871 l = ['ab', 'cd', 'ef']
2872 buf = self.BytesIO()
2873 txt = self.TextIOWrapper(buf)
2874 txt.writelines(l)
2875 txt.flush()
2876 self.assertEqual(buf.getvalue(), b'abcdef')
2877
2878 def test_writelines_userlist(self):
2879 l = UserList(['ab', 'cd', 'ef'])
2880 buf = self.BytesIO()
2881 txt = self.TextIOWrapper(buf)
2882 txt.writelines(l)
2883 txt.flush()
2884 self.assertEqual(buf.getvalue(), b'abcdef')
2885
2886 def test_writelines_error(self):
2887 txt = self.TextIOWrapper(self.BytesIO())
2888 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2889 self.assertRaises(TypeError, txt.writelines, None)
2890 self.assertRaises(TypeError, txt.writelines, b'abc')
2891
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002892 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002893 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002894
2895 # read one char at a time
2896 reads = ""
2897 while True:
2898 c = txt.read(1)
2899 if not c:
2900 break
2901 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002902 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002903
2904 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002905 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002906 txt._CHUNK_SIZE = 4
2907
2908 reads = ""
2909 while True:
2910 c = txt.read(4)
2911 if not c:
2912 break
2913 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002914 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002915
2916 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002917 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002918 txt._CHUNK_SIZE = 4
2919
2920 reads = txt.read(4)
2921 reads += txt.read(4)
2922 reads += txt.readline()
2923 reads += txt.readline()
2924 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002925 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002926
2927 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002928 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002929 txt._CHUNK_SIZE = 4
2930
2931 reads = txt.read(4)
2932 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002933 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002934
2935 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002936 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002937 txt._CHUNK_SIZE = 4
2938
2939 reads = txt.read(4)
2940 pos = txt.tell()
2941 txt.seek(0)
2942 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002943 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002944
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002945 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002946 buffer = self.BytesIO(self.testdata)
2947 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002948
2949 self.assertEqual(buffer.seekable(), txt.seekable())
2950
Antoine Pitroue4501852009-05-14 18:55:55 +00002951 def test_append_bom(self):
2952 # The BOM is not written again when appending to a non-empty file
2953 filename = support.TESTFN
2954 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2955 with self.open(filename, 'w', encoding=charset) as f:
2956 f.write('aaa')
2957 pos = f.tell()
2958 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002959 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002960
2961 with self.open(filename, 'a', encoding=charset) as f:
2962 f.write('xxx')
2963 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002964 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002965
2966 def test_seek_bom(self):
2967 # Same test, but when seeking manually
2968 filename = support.TESTFN
2969 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2970 with self.open(filename, 'w', encoding=charset) as f:
2971 f.write('aaa')
2972 pos = f.tell()
2973 with self.open(filename, 'r+', encoding=charset) as f:
2974 f.seek(pos)
2975 f.write('zzz')
2976 f.seek(0)
2977 f.write('bbb')
2978 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002979 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002980
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02002981 def test_seek_append_bom(self):
2982 # Same test, but first seek to the start and then to the end
2983 filename = support.TESTFN
2984 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2985 with self.open(filename, 'w', encoding=charset) as f:
2986 f.write('aaa')
2987 with self.open(filename, 'a', encoding=charset) as f:
2988 f.seek(0)
2989 f.seek(0, self.SEEK_END)
2990 f.write('xxx')
2991 with self.open(filename, 'rb') as f:
2992 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
2993
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002994 def test_errors_property(self):
2995 with self.open(support.TESTFN, "w") as f:
2996 self.assertEqual(f.errors, "strict")
2997 with self.open(support.TESTFN, "w", errors="replace") as f:
2998 self.assertEqual(f.errors, "replace")
2999
Brett Cannon31f59292011-02-21 19:29:56 +00003000 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00003001 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003002 def test_threads_write(self):
3003 # Issue6750: concurrent writes could duplicate data
3004 event = threading.Event()
3005 with self.open(support.TESTFN, "w", buffering=1) as f:
3006 def run(n):
3007 text = "Thread%03d\n" % n
3008 event.wait()
3009 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003010 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003011 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003012 with support.start_threads(threads, event.set):
3013 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003014 with self.open(support.TESTFN) as f:
3015 content = f.read()
3016 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003017 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003018
Antoine Pitrou6be88762010-05-03 16:48:20 +00003019 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003020 # Test that text file is closed despite failed flush
3021 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003022 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003023 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003024 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003025 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003026 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003027 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003028 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003029 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003030 self.assertTrue(txt.buffer.closed)
3031 self.assertTrue(closed) # flush() called
3032 self.assertFalse(closed[0]) # flush() called before file closed
3033 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003034 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003035
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003036 def test_close_error_on_close(self):
3037 buffer = self.BytesIO(self.testdata)
3038 def bad_flush():
3039 raise OSError('flush')
3040 def bad_close():
3041 raise OSError('close')
3042 buffer.close = bad_close
3043 txt = self.TextIOWrapper(buffer, encoding="ascii")
3044 txt.flush = bad_flush
3045 with self.assertRaises(OSError) as err: # exception not swallowed
3046 txt.close()
3047 self.assertEqual(err.exception.args, ('close',))
3048 self.assertIsInstance(err.exception.__context__, OSError)
3049 self.assertEqual(err.exception.__context__.args, ('flush',))
3050 self.assertFalse(txt.closed)
3051
3052 def test_nonnormalized_close_error_on_close(self):
3053 # Issue #21677
3054 buffer = self.BytesIO(self.testdata)
3055 def bad_flush():
3056 raise non_existing_flush
3057 def bad_close():
3058 raise non_existing_close
3059 buffer.close = bad_close
3060 txt = self.TextIOWrapper(buffer, encoding="ascii")
3061 txt.flush = bad_flush
3062 with self.assertRaises(NameError) as err: # exception not swallowed
3063 txt.close()
3064 self.assertIn('non_existing_close', str(err.exception))
3065 self.assertIsInstance(err.exception.__context__, NameError)
3066 self.assertIn('non_existing_flush', str(err.exception.__context__))
3067 self.assertFalse(txt.closed)
3068
Antoine Pitrou6be88762010-05-03 16:48:20 +00003069 def test_multi_close(self):
3070 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3071 txt.close()
3072 txt.close()
3073 txt.close()
3074 self.assertRaises(ValueError, txt.flush)
3075
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003076 def test_unseekable(self):
3077 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3078 self.assertRaises(self.UnsupportedOperation, txt.tell)
3079 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3080
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003081 def test_readonly_attributes(self):
3082 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3083 buf = self.BytesIO(self.testdata)
3084 with self.assertRaises(AttributeError):
3085 txt.buffer = buf
3086
Antoine Pitroue96ec682011-07-23 21:46:35 +02003087 def test_rawio(self):
3088 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3089 # that subprocess.Popen() can have the required unbuffered
3090 # semantics with universal_newlines=True.
3091 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3092 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3093 # Reads
3094 self.assertEqual(txt.read(4), 'abcd')
3095 self.assertEqual(txt.readline(), 'efghi\n')
3096 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3097
3098 def test_rawio_write_through(self):
3099 # Issue #12591: with write_through=True, writes don't need a flush
3100 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3101 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3102 write_through=True)
3103 txt.write('1')
3104 txt.write('23\n4')
3105 txt.write('5')
3106 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3107
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003108 def test_bufio_write_through(self):
3109 # Issue #21396: write_through=True doesn't force a flush()
3110 # on the underlying binary buffered object.
3111 flush_called, write_called = [], []
3112 class BufferedWriter(self.BufferedWriter):
3113 def flush(self, *args, **kwargs):
3114 flush_called.append(True)
3115 return super().flush(*args, **kwargs)
3116 def write(self, *args, **kwargs):
3117 write_called.append(True)
3118 return super().write(*args, **kwargs)
3119
3120 rawio = self.BytesIO()
3121 data = b"a"
3122 bufio = BufferedWriter(rawio, len(data)*2)
3123 textio = self.TextIOWrapper(bufio, encoding='ascii',
3124 write_through=True)
3125 # write to the buffered io but don't overflow the buffer
3126 text = data.decode('ascii')
3127 textio.write(text)
3128
3129 # buffer.flush is not called with write_through=True
3130 self.assertFalse(flush_called)
3131 # buffer.write *is* called with write_through=True
3132 self.assertTrue(write_called)
3133 self.assertEqual(rawio.getvalue(), b"") # no flush
3134
3135 write_called = [] # reset
3136 textio.write(text * 10) # total content is larger than bufio buffer
3137 self.assertTrue(write_called)
3138 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3139
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003140 def test_read_nonbytes(self):
3141 # Issue #17106
3142 # Crash when underlying read() returns non-bytes
3143 t = self.TextIOWrapper(self.StringIO('a'))
3144 self.assertRaises(TypeError, t.read, 1)
3145 t = self.TextIOWrapper(self.StringIO('a'))
3146 self.assertRaises(TypeError, t.readline)
3147 t = self.TextIOWrapper(self.StringIO('a'))
3148 self.assertRaises(TypeError, t.read)
3149
3150 def test_illegal_decoder(self):
3151 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003152 # Bypass the early encoding check added in issue 20404
3153 def _make_illegal_wrapper():
3154 quopri = codecs.lookup("quopri")
3155 quopri._is_text_encoding = True
3156 try:
3157 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3158 newline='\n', encoding="quopri")
3159 finally:
3160 quopri._is_text_encoding = False
3161 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003162 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003163 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003164 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003165 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003166 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003167 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003168 self.assertRaises(TypeError, t.read)
3169
Antoine Pitrou712cb732013-12-21 15:51:54 +01003170 def _check_create_at_shutdown(self, **kwargs):
3171 # Issue #20037: creating a TextIOWrapper at shutdown
3172 # shouldn't crash the interpreter.
3173 iomod = self.io.__name__
3174 code = """if 1:
3175 import codecs
3176 import {iomod} as io
3177
3178 # Avoid looking up codecs at shutdown
3179 codecs.lookup('utf-8')
3180
3181 class C:
3182 def __init__(self):
3183 self.buf = io.BytesIO()
3184 def __del__(self):
3185 io.TextIOWrapper(self.buf, **{kwargs})
3186 print("ok")
3187 c = C()
3188 """.format(iomod=iomod, kwargs=kwargs)
3189 return assert_python_ok("-c", code)
3190
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003191 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003192 def test_create_at_shutdown_without_encoding(self):
3193 rc, out, err = self._check_create_at_shutdown()
3194 if err:
3195 # Can error out with a RuntimeError if the module state
3196 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003197 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003198 else:
3199 self.assertEqual("ok", out.decode().strip())
3200
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003201 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003202 def test_create_at_shutdown_with_encoding(self):
3203 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3204 errors='strict')
3205 self.assertFalse(err)
3206 self.assertEqual("ok", out.decode().strip())
3207
Antoine Pitroub8503892014-04-29 10:14:02 +02003208 def test_read_byteslike(self):
3209 r = MemviewBytesIO(b'Just some random string\n')
3210 t = self.TextIOWrapper(r, 'utf-8')
3211
3212 # TextIOwrapper will not read the full string, because
3213 # we truncate it to a multiple of the native int size
3214 # so that we can construct a more complex memoryview.
3215 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3216
3217 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3218
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003219 def test_issue22849(self):
3220 class F(object):
3221 def readable(self): return True
3222 def writable(self): return True
3223 def seekable(self): return True
3224
3225 for i in range(10):
3226 try:
3227 self.TextIOWrapper(F(), encoding='utf-8')
3228 except Exception:
3229 pass
3230
3231 F.tell = lambda x: 0
3232 t = self.TextIOWrapper(F(), encoding='utf-8')
3233
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003234
Antoine Pitroub8503892014-04-29 10:14:02 +02003235class MemviewBytesIO(io.BytesIO):
3236 '''A BytesIO object whose read method returns memoryviews
3237 rather than bytes'''
3238
3239 def read1(self, len_):
3240 return _to_memoryview(super().read1(len_))
3241
3242 def read(self, len_):
3243 return _to_memoryview(super().read(len_))
3244
3245def _to_memoryview(buf):
3246 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3247
3248 arr = array.array('i')
3249 idx = len(buf) - len(buf) % arr.itemsize
3250 arr.frombytes(buf[:idx])
3251 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003252
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003253
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003254class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003255 io = io
Steve Dowerfe8f4c92016-09-07 09:31:52 -07003256 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003257
3258 def test_initialization(self):
3259 r = self.BytesIO(b"\xc3\xa9\n\n")
3260 b = self.BufferedReader(r, 1000)
3261 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003262 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3263 self.assertRaises(ValueError, t.read)
3264
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003265 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3266 self.assertRaises(Exception, repr, t)
3267
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003268 def test_garbage_collection(self):
3269 # C TextIOWrapper objects are collected, and collecting them flushes
3270 # all data to disk.
3271 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003272 with support.check_warnings(('', ResourceWarning)):
3273 rawio = io.FileIO(support.TESTFN, "wb")
3274 b = self.BufferedWriter(rawio)
3275 t = self.TextIOWrapper(b, encoding="ascii")
3276 t.write("456def")
3277 t.x = t
3278 wr = weakref.ref(t)
3279 del t
3280 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003281 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003282 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003283 self.assertEqual(f.read(), b"456def")
3284
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003285 def test_rwpair_cleared_before_textio(self):
3286 # Issue 13070: TextIOWrapper's finalization would crash when called
3287 # after the reference to the underlying BufferedRWPair's writer got
3288 # cleared by the GC.
3289 for i in range(1000):
3290 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3291 t1 = self.TextIOWrapper(b1, encoding="ascii")
3292 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3293 t2 = self.TextIOWrapper(b2, encoding="ascii")
3294 # circular references
3295 t1.buddy = t2
3296 t2.buddy = t1
3297 support.gc_collect()
3298
3299
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003300class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003301 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003302 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003303
3304
3305class IncrementalNewlineDecoderTest(unittest.TestCase):
3306
3307 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003308 # UTF-8 specific tests for a newline decoder
3309 def _check_decode(b, s, **kwargs):
3310 # We exercise getstate() / setstate() as well as decode()
3311 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003312 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003313 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003314 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003315
Antoine Pitrou180a3362008-12-14 16:36:46 +00003316 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003317
Antoine Pitrou180a3362008-12-14 16:36:46 +00003318 _check_decode(b'\xe8', "")
3319 _check_decode(b'\xa2', "")
3320 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003321
Antoine Pitrou180a3362008-12-14 16:36:46 +00003322 _check_decode(b'\xe8', "")
3323 _check_decode(b'\xa2', "")
3324 _check_decode(b'\x88', "\u8888")
3325
3326 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003327 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3328
Antoine Pitrou180a3362008-12-14 16:36:46 +00003329 decoder.reset()
3330 _check_decode(b'\n', "\n")
3331 _check_decode(b'\r', "")
3332 _check_decode(b'', "\n", final=True)
3333 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003334
Antoine Pitrou180a3362008-12-14 16:36:46 +00003335 _check_decode(b'\r', "")
3336 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003337
Antoine Pitrou180a3362008-12-14 16:36:46 +00003338 _check_decode(b'\r\r\n', "\n\n")
3339 _check_decode(b'\r', "")
3340 _check_decode(b'\r', "\n")
3341 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003342
Antoine Pitrou180a3362008-12-14 16:36:46 +00003343 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3344 _check_decode(b'\xe8\xa2\x88', "\u8888")
3345 _check_decode(b'\n', "\n")
3346 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3347 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003348
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003349 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003350 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003351 if encoding is not None:
3352 encoder = codecs.getincrementalencoder(encoding)()
3353 def _decode_bytewise(s):
3354 # Decode one byte at a time
3355 for b in encoder.encode(s):
3356 result.append(decoder.decode(bytes([b])))
3357 else:
3358 encoder = None
3359 def _decode_bytewise(s):
3360 # Decode one char at a time
3361 for c in s:
3362 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003363 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003364 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003365 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003366 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003367 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003368 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003369 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003370 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003371 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003372 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003373 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003374 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003375 input = "abc"
3376 if encoder is not None:
3377 encoder.reset()
3378 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003379 self.assertEqual(decoder.decode(input), "abc")
3380 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003381
3382 def test_newline_decoder(self):
3383 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003384 # None meaning the IncrementalNewlineDecoder takes unicode input
3385 # rather than bytes input
3386 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003387 'utf-16', 'utf-16-le', 'utf-16-be',
3388 'utf-32', 'utf-32-le', 'utf-32-be',
3389 )
3390 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003391 decoder = enc and codecs.getincrementaldecoder(enc)()
3392 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3393 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003394 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003395 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3396 self.check_newline_decoding_utf8(decoder)
3397
Antoine Pitrou66913e22009-03-06 23:40:56 +00003398 def test_newline_bytes(self):
3399 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3400 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003401 self.assertEqual(dec.newlines, None)
3402 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3403 self.assertEqual(dec.newlines, None)
3404 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3405 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003406 dec = self.IncrementalNewlineDecoder(None, translate=False)
3407 _check(dec)
3408 dec = self.IncrementalNewlineDecoder(None, translate=True)
3409 _check(dec)
3410
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003411class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3412 pass
3413
3414class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3415 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003416
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003417
Guido van Rossum01a27522007-03-07 01:00:12 +00003418# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003419
Guido van Rossum5abbf752007-08-27 17:39:33 +00003420class MiscIOTest(unittest.TestCase):
3421
Barry Warsaw40e82462008-11-20 20:14:50 +00003422 def tearDown(self):
3423 support.unlink(support.TESTFN)
3424
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003425 def test___all__(self):
3426 for name in self.io.__all__:
3427 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003428 self.assertIsNotNone(obj, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003429 if name == "open":
3430 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003431 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003432 self.assertTrue(issubclass(obj, Exception), name)
3433 elif not name.startswith("SEEK_"):
3434 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003435
Barry Warsaw40e82462008-11-20 20:14:50 +00003436 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003437 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003438 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003439 f.close()
3440
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003441 with support.check_warnings(('', DeprecationWarning)):
3442 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003443 self.assertEqual(f.name, support.TESTFN)
3444 self.assertEqual(f.buffer.name, support.TESTFN)
3445 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3446 self.assertEqual(f.mode, "U")
3447 self.assertEqual(f.buffer.mode, "rb")
3448 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003449 f.close()
3450
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003451 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003452 self.assertEqual(f.mode, "w+")
3453 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3454 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003455
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003456 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003457 self.assertEqual(g.mode, "wb")
3458 self.assertEqual(g.raw.mode, "wb")
3459 self.assertEqual(g.name, f.fileno())
3460 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003461 f.close()
3462 g.close()
3463
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003464 def test_io_after_close(self):
3465 for kwargs in [
3466 {"mode": "w"},
3467 {"mode": "wb"},
3468 {"mode": "w", "buffering": 1},
3469 {"mode": "w", "buffering": 2},
3470 {"mode": "wb", "buffering": 0},
3471 {"mode": "r"},
3472 {"mode": "rb"},
3473 {"mode": "r", "buffering": 1},
3474 {"mode": "r", "buffering": 2},
3475 {"mode": "rb", "buffering": 0},
3476 {"mode": "w+"},
3477 {"mode": "w+b"},
3478 {"mode": "w+", "buffering": 1},
3479 {"mode": "w+", "buffering": 2},
3480 {"mode": "w+b", "buffering": 0},
3481 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003482 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003483 f.close()
3484 self.assertRaises(ValueError, f.flush)
3485 self.assertRaises(ValueError, f.fileno)
3486 self.assertRaises(ValueError, f.isatty)
3487 self.assertRaises(ValueError, f.__iter__)
3488 if hasattr(f, "peek"):
3489 self.assertRaises(ValueError, f.peek, 1)
3490 self.assertRaises(ValueError, f.read)
3491 if hasattr(f, "read1"):
3492 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003493 if hasattr(f, "readall"):
3494 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003495 if hasattr(f, "readinto"):
3496 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003497 if hasattr(f, "readinto1"):
3498 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003499 self.assertRaises(ValueError, f.readline)
3500 self.assertRaises(ValueError, f.readlines)
Xiang Zhangd5fa5f32017-04-15 13:25:15 +08003501 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003502 self.assertRaises(ValueError, f.seek, 0)
3503 self.assertRaises(ValueError, f.tell)
3504 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003505 self.assertRaises(ValueError, f.write,
3506 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003507 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003508 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003509
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003510 def test_blockingioerror(self):
3511 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003512 class C(str):
3513 pass
3514 c = C("")
3515 b = self.BlockingIOError(1, c)
3516 c.b = b
3517 b.c = c
3518 wr = weakref.ref(c)
3519 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003520 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003521 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003522
3523 def test_abcs(self):
3524 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003525 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3526 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3527 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3528 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003529
3530 def _check_abc_inheritance(self, abcmodule):
3531 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003532 self.assertIsInstance(f, abcmodule.IOBase)
3533 self.assertIsInstance(f, abcmodule.RawIOBase)
3534 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3535 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003536 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003537 self.assertIsInstance(f, abcmodule.IOBase)
3538 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3539 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3540 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003541 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003542 self.assertIsInstance(f, abcmodule.IOBase)
3543 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3544 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3545 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003546
3547 def test_abc_inheritance(self):
3548 # Test implementations inherit from their respective ABCs
3549 self._check_abc_inheritance(self)
3550
3551 def test_abc_inheritance_official(self):
3552 # Test implementations inherit from the official ABCs of the
3553 # baseline "io" module.
3554 self._check_abc_inheritance(io)
3555
Antoine Pitroue033e062010-10-29 10:38:18 +00003556 def _check_warn_on_dealloc(self, *args, **kwargs):
3557 f = open(*args, **kwargs)
3558 r = repr(f)
3559 with self.assertWarns(ResourceWarning) as cm:
3560 f = None
3561 support.gc_collect()
3562 self.assertIn(r, str(cm.warning.args[0]))
3563
3564 def test_warn_on_dealloc(self):
3565 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3566 self._check_warn_on_dealloc(support.TESTFN, "wb")
3567 self._check_warn_on_dealloc(support.TESTFN, "w")
3568
3569 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3570 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003571 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003572 for fd in fds:
3573 try:
3574 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003575 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003576 if e.errno != errno.EBADF:
3577 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003578 self.addCleanup(cleanup_fds)
3579 r, w = os.pipe()
3580 fds += r, w
3581 self._check_warn_on_dealloc(r, *args, **kwargs)
3582 # When using closefd=False, there's no warning
3583 r, w = os.pipe()
3584 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02003585 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00003586 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00003587
3588 def test_warn_on_dealloc_fd(self):
3589 self._check_warn_on_dealloc_fd("rb", buffering=0)
3590 self._check_warn_on_dealloc_fd("rb")
3591 self._check_warn_on_dealloc_fd("r")
3592
3593
Antoine Pitrou243757e2010-11-05 21:15:39 +00003594 def test_pickling(self):
3595 # Pickling file objects is forbidden
3596 for kwargs in [
3597 {"mode": "w"},
3598 {"mode": "wb"},
3599 {"mode": "wb", "buffering": 0},
3600 {"mode": "r"},
3601 {"mode": "rb"},
3602 {"mode": "rb", "buffering": 0},
3603 {"mode": "w+"},
3604 {"mode": "w+b"},
3605 {"mode": "w+b", "buffering": 0},
3606 ]:
3607 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3608 with self.open(support.TESTFN, **kwargs) as f:
3609 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3610
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003611 def test_nonblock_pipe_write_bigbuf(self):
3612 self._test_nonblock_pipe_write(16*1024)
3613
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003614 def test_nonblock_pipe_write_smallbuf(self):
3615 self._test_nonblock_pipe_write(1024)
3616
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003617 @unittest.skipUnless(hasattr(os, 'set_blocking'),
3618 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003619 def _test_nonblock_pipe_write(self, bufsize):
3620 sent = []
3621 received = []
3622 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003623 os.set_blocking(r, False)
3624 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003625
3626 # To exercise all code paths in the C implementation we need
3627 # to play with buffer sizes. For instance, if we choose a
3628 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3629 # then we will never get a partial write of the buffer.
3630 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3631 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3632
3633 with rf, wf:
3634 for N in 9999, 73, 7574:
3635 try:
3636 i = 0
3637 while True:
3638 msg = bytes([i % 26 + 97]) * N
3639 sent.append(msg)
3640 wf.write(msg)
3641 i += 1
3642
3643 except self.BlockingIOError as e:
3644 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003645 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003646 sent[-1] = sent[-1][:e.characters_written]
3647 received.append(rf.read())
3648 msg = b'BLOCKED'
3649 wf.write(msg)
3650 sent.append(msg)
3651
3652 while True:
3653 try:
3654 wf.flush()
3655 break
3656 except self.BlockingIOError as e:
3657 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003658 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003659 self.assertEqual(e.characters_written, 0)
3660 received.append(rf.read())
3661
3662 received += iter(rf.read, None)
3663
3664 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003665 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003666 self.assertTrue(wf.closed)
3667 self.assertTrue(rf.closed)
3668
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003669 def test_create_fail(self):
3670 # 'x' mode fails if file is existing
3671 with self.open(support.TESTFN, 'w'):
3672 pass
3673 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3674
3675 def test_create_writes(self):
3676 # 'x' mode opens for writing
3677 with self.open(support.TESTFN, 'xb') as f:
3678 f.write(b"spam")
3679 with self.open(support.TESTFN, 'rb') as f:
3680 self.assertEqual(b"spam", f.read())
3681
Christian Heimes7b648752012-09-10 14:48:43 +02003682 def test_open_allargs(self):
3683 # there used to be a buffer overflow in the parser for rawmode
3684 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3685
3686
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003687class CMiscIOTest(MiscIOTest):
3688 io = io
3689
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003690 def test_readinto_buffer_overflow(self):
3691 # Issue #18025
3692 class BadReader(self.io.BufferedIOBase):
3693 def read(self, n=-1):
3694 return b'x' * 10**6
3695 bufio = BadReader()
3696 b = bytearray(2)
3697 self.assertRaises(ValueError, bufio.readinto, b)
3698
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003699 @unittest.skipUnless(threading, 'Threading required for this test.')
3700 def check_daemon_threads_shutdown_deadlock(self, stream_name):
3701 # Issue #23309: deadlocks at shutdown should be avoided when a
3702 # daemon thread and the main thread both write to a file.
3703 code = """if 1:
3704 import sys
3705 import time
3706 import threading
3707
3708 file = sys.{stream_name}
3709
3710 def run():
3711 while True:
3712 file.write('.')
3713 file.flush()
3714
3715 thread = threading.Thread(target=run)
3716 thread.daemon = True
3717 thread.start()
3718
3719 time.sleep(0.5)
3720 file.write('!')
3721 file.flush()
3722 """.format_map(locals())
3723 res, _ = run_python_until_end("-c", code)
3724 err = res.err.decode()
3725 if res.rc != 0:
3726 # Failure: should be a fatal error
3727 self.assertIn("Fatal Python error: could not acquire lock "
3728 "for <_io.BufferedWriter name='<{stream_name}>'> "
3729 "at interpreter shutdown, possibly due to "
3730 "daemon threads".format_map(locals()),
3731 err)
3732 else:
3733 self.assertFalse(err.strip('.!'))
3734
3735 def test_daemon_threads_shutdown_stdout_deadlock(self):
3736 self.check_daemon_threads_shutdown_deadlock('stdout')
3737
3738 def test_daemon_threads_shutdown_stderr_deadlock(self):
3739 self.check_daemon_threads_shutdown_deadlock('stderr')
3740
3741
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003742class PyMiscIOTest(MiscIOTest):
3743 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003744
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003745
3746@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3747class SignalsTest(unittest.TestCase):
3748
3749 def setUp(self):
3750 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3751
3752 def tearDown(self):
3753 signal.signal(signal.SIGALRM, self.oldalrm)
3754
3755 def alarm_interrupt(self, sig, frame):
3756 1/0
3757
3758 @unittest.skipUnless(threading, 'Threading required for this test.')
3759 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3760 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003761 invokes the signal handler, and bubbles up the exception raised
3762 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003763 read_results = []
3764 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003765 if hasattr(signal, 'pthread_sigmask'):
3766 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003767 s = os.read(r, 1)
3768 read_results.append(s)
3769 t = threading.Thread(target=_read)
3770 t.daemon = True
3771 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003772 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01003773 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003774 try:
3775 wio = self.io.open(w, **fdopen_kwargs)
3776 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003777 # Fill the pipe enough that the write will be blocking.
3778 # It will be interrupted by the timer armed above. Since the
3779 # other thread has read one byte, the low-level write will
3780 # return with a successful (partial) result rather than an EINTR.
3781 # The buffered IO layer must check for pending signal
3782 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003783 signal.alarm(1)
3784 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01003785 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02003786 finally:
3787 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003788 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003789 # We got one byte, get another one and check that it isn't a
3790 # repeat of the first one.
3791 read_results.append(os.read(r, 1))
3792 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3793 finally:
3794 os.close(w)
3795 os.close(r)
3796 # This is deliberate. If we didn't close the file descriptor
3797 # before closing wio, wio would try to flush its internal
3798 # buffer, and block again.
3799 try:
3800 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003801 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003802 if e.errno != errno.EBADF:
3803 raise
3804
3805 def test_interrupted_write_unbuffered(self):
3806 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3807
3808 def test_interrupted_write_buffered(self):
3809 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3810
Victor Stinner6ab72862014-09-03 23:32:28 +02003811 # Issue #22331: The test hangs on FreeBSD 7.2
3812 @support.requires_freebsd_version(8)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003813 def test_interrupted_write_text(self):
3814 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3815
Brett Cannon31f59292011-02-21 19:29:56 +00003816 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003817 def check_reentrant_write(self, data, **fdopen_kwargs):
3818 def on_alarm(*args):
3819 # Will be called reentrantly from the same thread
3820 wio.write(data)
3821 1/0
3822 signal.signal(signal.SIGALRM, on_alarm)
3823 r, w = os.pipe()
3824 wio = self.io.open(w, **fdopen_kwargs)
3825 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003826 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003827 # Either the reentrant call to wio.write() fails with RuntimeError,
3828 # or the signal handler raises ZeroDivisionError.
3829 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3830 while 1:
3831 for i in range(100):
3832 wio.write(data)
3833 wio.flush()
3834 # Make sure the buffer doesn't fill up and block further writes
3835 os.read(r, len(data) * 100)
3836 exc = cm.exception
3837 if isinstance(exc, RuntimeError):
3838 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3839 finally:
3840 wio.close()
3841 os.close(r)
3842
3843 def test_reentrant_write_buffered(self):
3844 self.check_reentrant_write(b"xy", mode="wb")
3845
3846 def test_reentrant_write_text(self):
3847 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3848
Antoine Pitrou707ce822011-02-25 21:24:11 +00003849 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3850 """Check that a buffered read, when it gets interrupted (either
3851 returning a partial result or EINTR), properly invokes the signal
3852 handler and retries if the latter returned successfully."""
3853 r, w = os.pipe()
3854 fdopen_kwargs["closefd"] = False
3855 def alarm_handler(sig, frame):
3856 os.write(w, b"bar")
3857 signal.signal(signal.SIGALRM, alarm_handler)
3858 try:
3859 rio = self.io.open(r, **fdopen_kwargs)
3860 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003861 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003862 # Expected behaviour:
3863 # - first raw read() returns partial b"foo"
3864 # - second raw read() returns EINTR
3865 # - third raw read() returns b"bar"
3866 self.assertEqual(decode(rio.read(6)), "foobar")
3867 finally:
3868 rio.close()
3869 os.close(w)
3870 os.close(r)
3871
Antoine Pitrou20db5112011-08-19 20:32:34 +02003872 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003873 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3874 mode="rb")
3875
Antoine Pitrou20db5112011-08-19 20:32:34 +02003876 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003877 self.check_interrupted_read_retry(lambda x: x,
3878 mode="r")
3879
3880 @unittest.skipUnless(threading, 'Threading required for this test.')
3881 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3882 """Check that a buffered write, when it gets interrupted (either
3883 returning a partial result or EINTR), properly invokes the signal
3884 handler and retries if the latter returned successfully."""
3885 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003886
Antoine Pitrou707ce822011-02-25 21:24:11 +00003887 # A quantity that exceeds the buffer size of an anonymous pipe's
3888 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003889 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003890 r, w = os.pipe()
3891 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003892
Antoine Pitrou707ce822011-02-25 21:24:11 +00003893 # We need a separate thread to read from the pipe and allow the
3894 # write() to finish. This thread is started after the SIGALRM is
3895 # received (forcing a first EINTR in write()).
3896 read_results = []
3897 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003898 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00003899 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003900 try:
3901 while not write_finished:
3902 while r in select.select([r], [], [], 1.0)[0]:
3903 s = os.read(r, 1024)
3904 read_results.append(s)
3905 except BaseException as exc:
3906 nonlocal error
3907 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00003908 t = threading.Thread(target=_read)
3909 t.daemon = True
3910 def alarm1(sig, frame):
3911 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003912 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003913 def alarm2(sig, frame):
3914 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003915
3916 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00003917 signal.signal(signal.SIGALRM, alarm1)
3918 try:
3919 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003920 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003921 # Expected behaviour:
3922 # - first raw write() is partial (because of the limited pipe buffer
3923 # and the first alarm)
3924 # - second raw write() returns EINTR (because of the second alarm)
3925 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003926 written = wio.write(large_data)
3927 self.assertEqual(N, written)
3928
Antoine Pitrou707ce822011-02-25 21:24:11 +00003929 wio.flush()
3930 write_finished = True
3931 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003932
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003933 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003934 self.assertEqual(N, sum(len(x) for x in read_results))
3935 finally:
3936 write_finished = True
3937 os.close(w)
3938 os.close(r)
3939 # This is deliberate. If we didn't close the file descriptor
3940 # before closing wio, wio would try to flush its internal
3941 # buffer, and could block (in case of failure).
3942 try:
3943 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003944 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003945 if e.errno != errno.EBADF:
3946 raise
3947
Antoine Pitrou20db5112011-08-19 20:32:34 +02003948 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003949 self.check_interrupted_write_retry(b"x", mode="wb")
3950
Antoine Pitrou20db5112011-08-19 20:32:34 +02003951 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003952 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3953
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003954
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003955class CSignalsTest(SignalsTest):
3956 io = io
3957
3958class PySignalsTest(SignalsTest):
3959 io = pyio
3960
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003961 # Handling reentrancy issues would slow down _pyio even more, so the
3962 # tests are disabled.
3963 test_reentrant_write_buffered = None
3964 test_reentrant_write_text = None
3965
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003966
Ezio Melottidaa42c72013-03-23 16:30:16 +02003967def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07003968 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003969 CBufferedReaderTest, PyBufferedReaderTest,
3970 CBufferedWriterTest, PyBufferedWriterTest,
3971 CBufferedRWPairTest, PyBufferedRWPairTest,
3972 CBufferedRandomTest, PyBufferedRandomTest,
3973 StatefulIncrementalDecoderTest,
3974 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3975 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003976 CMiscIOTest, PyMiscIOTest,
3977 CSignalsTest, PySignalsTest,
3978 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003979
3980 # Put the namespaces of the IO module we are testing and some useful mock
3981 # classes in the __dict__ of each test.
3982 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003983 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003984 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3985 c_io_ns = {name : getattr(io, name) for name in all_members}
3986 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3987 globs = globals()
3988 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3989 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3990 # Avoid turning open into a bound method.
3991 py_io_ns["open"] = pyio.OpenWrapper
3992 for test in tests:
3993 if test.__name__.startswith("C"):
3994 for name, obj in c_io_ns.items():
3995 setattr(test, name, obj)
3996 elif test.__name__.startswith("Py"):
3997 for name, obj in py_io_ns.items():
3998 setattr(test, name, obj)
3999
Ezio Melottidaa42c72013-03-23 16:30:16 +02004000 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4001 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004002
4003if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004004 unittest.main()