blob: 83d6c4e57ecbeed29b3873a1e28d0dfcb07cdce6 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Berker Peksagce643912015-05-06 06:33:17 +030038from test.support.script_helper import assert_python_ok, run_python_until_end
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000047
Martin Panter6bb91f32016-05-28 00:41:57 +000048try:
49 import ctypes
50except ImportError:
51 def byteslike(*pos, **kw):
52 return array.array("b", bytes(*pos, **kw))
53else:
54 def byteslike(*pos, **kw):
55 """Create a bytes-like object having no string or sequence methods"""
56 data = bytes(*pos, **kw)
57 obj = EmptyStruct()
58 ctypes.resize(obj, len(data))
59 memoryview(obj).cast("B")[:] = data
60 return obj
61 class EmptyStruct(ctypes.Structure):
62 pass
63
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064def _default_chunk_size():
65 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000066 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000067 return f._CHUNK_SIZE
68
69
Antoine Pitrou328ec742010-09-14 18:37:24 +000070class MockRawIOWithoutRead:
71 """A RawIO implementation without read(), so as to exercise the default
72 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000073
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000074 def __init__(self, read_stack=()):
75 self._read_stack = list(read_stack)
76 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000077 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000078 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000079
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000082 return len(b)
83
84 def writable(self):
85 return True
86
Guido van Rossum68bbcd22007-02-27 17:19:33 +000087 def fileno(self):
88 return 42
89
90 def readable(self):
91 return True
92
Guido van Rossum01a27522007-03-07 01:00:12 +000093 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000094 return True
95
Guido van Rossum01a27522007-03-07 01:00:12 +000096 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000098
99 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000100 return 0 # same comment as above
101
102 def readinto(self, buf):
103 self._reads += 1
104 max_len = len(buf)
105 try:
106 data = self._read_stack[0]
107 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000108 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000109 return 0
110 if data is None:
111 del self._read_stack[0]
112 return None
113 n = len(data)
114 if len(data) <= max_len:
115 del self._read_stack[0]
116 buf[:n] = data
117 return n
118 else:
119 buf[:] = data[:max_len]
120 self._read_stack[0] = data[max_len:]
121 return max_len
122
123 def truncate(self, pos=None):
124 return pos
125
Antoine Pitrou328ec742010-09-14 18:37:24 +0000126class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
127 pass
128
129class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
130 pass
131
132
133class MockRawIO(MockRawIOWithoutRead):
134
135 def read(self, n=None):
136 self._reads += 1
137 try:
138 return self._read_stack.pop(0)
139 except:
140 self._extraneous_reads += 1
141 return b""
142
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000143class CMockRawIO(MockRawIO, io.RawIOBase):
144 pass
145
146class PyMockRawIO(MockRawIO, pyio.RawIOBase):
147 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000148
Guido van Rossuma9e20242007-03-08 00:43:48 +0000149
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000150class MisbehavedRawIO(MockRawIO):
151 def write(self, b):
152 return super().write(b) * 2
153
154 def read(self, n=None):
155 return super().read(n) * 2
156
157 def seek(self, pos, whence):
158 return -123
159
160 def tell(self):
161 return -456
162
163 def readinto(self, buf):
164 super().readinto(buf)
165 return len(buf) * 5
166
167class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
168 pass
169
170class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
171 pass
172
173
174class CloseFailureIO(MockRawIO):
175 closed = 0
176
177 def close(self):
178 if not self.closed:
179 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200180 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181
182class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
183 pass
184
185class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
186 pass
187
188
189class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000190
191 def __init__(self, data):
192 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000193 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000194
195 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000197 self.read_history.append(None if res is None else len(res))
198 return res
199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000200 def readinto(self, b):
201 res = super().readinto(b)
202 self.read_history.append(res)
203 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000204
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000205class CMockFileIO(MockFileIO, io.BytesIO):
206 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000207
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000208class PyMockFileIO(MockFileIO, pyio.BytesIO):
209 pass
210
211
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000212class MockUnseekableIO:
213 def seekable(self):
214 return False
215
216 def seek(self, *args):
217 raise self.UnsupportedOperation("not seekable")
218
219 def tell(self, *args):
220 raise self.UnsupportedOperation("not seekable")
221
Martin Panter754aab22016-03-31 07:21:56 +0000222 def truncate(self, *args):
223 raise self.UnsupportedOperation("not seekable")
224
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000225class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
226 UnsupportedOperation = io.UnsupportedOperation
227
228class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
229 UnsupportedOperation = pyio.UnsupportedOperation
230
231
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000232class MockNonBlockWriterIO:
233
234 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000235 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000236 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000238 def pop_written(self):
239 s = b"".join(self._write_stack)
240 self._write_stack[:] = []
241 return s
242
243 def block_on(self, char):
244 """Block when a given char is encountered."""
245 self._blocker_char = char
246
247 def readable(self):
248 return True
249
250 def seekable(self):
251 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000252
Guido van Rossum01a27522007-03-07 01:00:12 +0000253 def writable(self):
254 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000255
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000256 def write(self, b):
257 b = bytes(b)
258 n = -1
259 if self._blocker_char:
260 try:
261 n = b.index(self._blocker_char)
262 except ValueError:
263 pass
264 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100265 if n > 0:
266 # write data up to the first blocker
267 self._write_stack.append(b[:n])
268 return n
269 else:
270 # cancel blocker and indicate would block
271 self._blocker_char = None
272 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000273 self._write_stack.append(b)
274 return len(b)
275
276class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
277 BlockingIOError = io.BlockingIOError
278
279class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
280 BlockingIOError = pyio.BlockingIOError
281
Guido van Rossuma9e20242007-03-08 00:43:48 +0000282
Guido van Rossum28524c72007-02-27 05:47:44 +0000283class IOTest(unittest.TestCase):
284
Neal Norwitze7789b12008-03-24 06:18:09 +0000285 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000286 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000287
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000288 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000289 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000290
Guido van Rossum28524c72007-02-27 05:47:44 +0000291 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000292 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293 f.truncate(0)
294 self.assertEqual(f.tell(), 5)
295 f.seek(0)
296
297 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000298 self.assertEqual(f.seek(0), 0)
299 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000300 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000301 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000302 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000303 buffer = bytearray(b" world\n\n\n")
304 self.assertEqual(f.write(buffer), 9)
305 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000306 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000307 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000308 self.assertEqual(f.seek(-1, 2), 13)
309 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000310
Guido van Rossum87429772007-04-10 21:06:59 +0000311 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000312 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000313 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000314
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 def read_ops(self, f, buffered=False):
316 data = f.read(5)
317 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000318 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000319 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000320 self.assertEqual(bytes(data), b" worl")
321 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000322 self.assertEqual(f.readinto(data), 2)
323 self.assertEqual(len(data), 5)
324 self.assertEqual(data[:2], b"d\n")
325 self.assertEqual(f.seek(0), 0)
326 self.assertEqual(f.read(20), b"hello world\n")
327 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000328 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000329 self.assertEqual(f.seek(-6, 2), 6)
330 self.assertEqual(f.read(5), b"world")
331 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000332 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000333 self.assertEqual(f.seek(-6, 1), 5)
334 self.assertEqual(f.read(5), b" worl")
335 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000336 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000337 if buffered:
338 f.seek(0)
339 self.assertEqual(f.read(), b"hello world\n")
340 f.seek(6)
341 self.assertEqual(f.read(), b"world\n")
342 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000343 f.seek(0)
344 data = byteslike(5)
345 self.assertEqual(f.readinto1(data), 5)
346 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000347
Guido van Rossum34d69e52007-04-10 20:08:41 +0000348 LARGE = 2**31
349
Guido van Rossum53807da2007-04-10 19:01:47 +0000350 def large_file_ops(self, f):
351 assert f.readable()
352 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100353 try:
354 self.assertEqual(f.seek(self.LARGE), self.LARGE)
355 except (OverflowError, ValueError):
356 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000357 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000358 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000359 self.assertEqual(f.tell(), self.LARGE + 3)
360 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000361 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000362 self.assertEqual(f.tell(), self.LARGE + 2)
363 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000364 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000365 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000366 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
367 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000368 self.assertEqual(f.read(2), b"x")
369
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000370 def test_invalid_operations(self):
371 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000372 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000373 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000374 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000375 self.assertRaises(exc, fp.read)
376 self.assertRaises(exc, fp.readline)
377 with self.open(support.TESTFN, "wb", buffering=0) as fp:
378 self.assertRaises(exc, fp.read)
379 self.assertRaises(exc, fp.readline)
380 with self.open(support.TESTFN, "rb", buffering=0) as fp:
381 self.assertRaises(exc, fp.write, b"blah")
382 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000383 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000384 self.assertRaises(exc, fp.write, b"blah")
385 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000386 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000387 self.assertRaises(exc, fp.write, "blah")
388 self.assertRaises(exc, fp.writelines, ["blah\n"])
389 # Non-zero seeking from current or end pos
390 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
391 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000392
Martin Panter754aab22016-03-31 07:21:56 +0000393 def test_optional_abilities(self):
394 # Test for OSError when optional APIs are not supported
395 # The purpose of this test is to try fileno(), reading, writing and
396 # seeking operations with various objects that indicate they do not
397 # support these operations.
398
399 def pipe_reader():
400 [r, w] = os.pipe()
401 os.close(w) # So that read() is harmless
402 return self.FileIO(r, "r")
403
404 def pipe_writer():
405 [r, w] = os.pipe()
406 self.addCleanup(os.close, r)
407 # Guarantee that we can write into the pipe without blocking
408 thread = threading.Thread(target=os.read, args=(r, 100))
409 thread.start()
410 self.addCleanup(thread.join)
411 return self.FileIO(w, "w")
412
413 def buffered_reader():
414 return self.BufferedReader(self.MockUnseekableIO())
415
416 def buffered_writer():
417 return self.BufferedWriter(self.MockUnseekableIO())
418
419 def buffered_random():
420 return self.BufferedRandom(self.BytesIO())
421
422 def buffered_rw_pair():
423 return self.BufferedRWPair(self.MockUnseekableIO(),
424 self.MockUnseekableIO())
425
426 def text_reader():
427 class UnseekableReader(self.MockUnseekableIO):
428 writable = self.BufferedIOBase.writable
429 write = self.BufferedIOBase.write
430 return self.TextIOWrapper(UnseekableReader(), "ascii")
431
432 def text_writer():
433 class UnseekableWriter(self.MockUnseekableIO):
434 readable = self.BufferedIOBase.readable
435 read = self.BufferedIOBase.read
436 return self.TextIOWrapper(UnseekableWriter(), "ascii")
437
438 tests = (
439 (pipe_reader, "fr"), (pipe_writer, "fw"),
440 (buffered_reader, "r"), (buffered_writer, "w"),
441 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
442 (text_reader, "r"), (text_writer, "w"),
443 (self.BytesIO, "rws"), (self.StringIO, "rws"),
444 )
445 for [test, abilities] in tests:
446 if test is pipe_writer and not threading:
447 continue # Skip subtest that uses a background thread
448 with self.subTest(test), test() as obj:
449 readable = "r" in abilities
450 self.assertEqual(obj.readable(), readable)
451 writable = "w" in abilities
452 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000453
454 if isinstance(obj, self.TextIOBase):
455 data = "3"
456 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
457 data = b"3"
458 else:
459 self.fail("Unknown base class")
460
461 if "f" in abilities:
462 obj.fileno()
463 else:
464 self.assertRaises(OSError, obj.fileno)
465
466 if readable:
467 obj.read(1)
468 obj.read()
469 else:
470 self.assertRaises(OSError, obj.read, 1)
471 self.assertRaises(OSError, obj.read)
472
473 if writable:
474 obj.write(data)
475 else:
476 self.assertRaises(OSError, obj.write, data)
477
Martin Panter3ee147f2016-03-31 21:05:31 +0000478 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000479 pipe_reader, pipe_writer):
480 # Pipes seem to appear as seekable on Windows
481 continue
482 seekable = "s" in abilities
483 self.assertEqual(obj.seekable(), seekable)
484
Martin Panter754aab22016-03-31 07:21:56 +0000485 if seekable:
486 obj.tell()
487 obj.seek(0)
488 else:
489 self.assertRaises(OSError, obj.tell)
490 self.assertRaises(OSError, obj.seek, 0)
491
492 if writable and seekable:
493 obj.truncate()
494 obj.truncate(0)
495 else:
496 self.assertRaises(OSError, obj.truncate)
497 self.assertRaises(OSError, obj.truncate, 0)
498
Antoine Pitrou13348842012-01-29 18:36:34 +0100499 def test_open_handles_NUL_chars(self):
500 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300501 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100502
503 bytes_fn = bytes(fn_with_NUL, 'ascii')
504 with warnings.catch_warnings():
505 warnings.simplefilter("ignore", DeprecationWarning)
506 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100507
Guido van Rossum28524c72007-02-27 05:47:44 +0000508 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000509 with self.open(support.TESTFN, "wb", buffering=0) as f:
510 self.assertEqual(f.readable(), False)
511 self.assertEqual(f.writable(), True)
512 self.assertEqual(f.seekable(), True)
513 self.write_ops(f)
514 with self.open(support.TESTFN, "rb", buffering=0) as f:
515 self.assertEqual(f.readable(), True)
516 self.assertEqual(f.writable(), False)
517 self.assertEqual(f.seekable(), True)
518 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000519
Guido van Rossum87429772007-04-10 21:06:59 +0000520 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000521 with self.open(support.TESTFN, "wb") as f:
522 self.assertEqual(f.readable(), False)
523 self.assertEqual(f.writable(), True)
524 self.assertEqual(f.seekable(), True)
525 self.write_ops(f)
526 with self.open(support.TESTFN, "rb") as f:
527 self.assertEqual(f.readable(), True)
528 self.assertEqual(f.writable(), False)
529 self.assertEqual(f.seekable(), True)
530 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000531
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000532 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000533 with self.open(support.TESTFN, "wb") as f:
534 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
535 with self.open(support.TESTFN, "rb") as f:
536 self.assertEqual(f.readline(), b"abc\n")
537 self.assertEqual(f.readline(10), b"def\n")
538 self.assertEqual(f.readline(2), b"xy")
539 self.assertEqual(f.readline(4), b"zzy\n")
540 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000541 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000542 self.assertRaises(TypeError, f.readline, 5.3)
543 with self.open(support.TESTFN, "r") as f:
544 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000545
Serhiy Storchakabf623ae2017-04-19 20:03:52 +0300546 def test_readline_nonsizeable(self):
547 # Issue #30061
548 # Crash when readline() returns an object without __len__
549 class R(self.IOBase):
550 def readline(self):
551 return None
552 self.assertRaises((TypeError, StopIteration), next, R())
553
554 def test_next_nonsizeable(self):
555 # Issue #30061
556 # Crash when __next__() returns an object without __len__
557 class R(self.IOBase):
558 def __next__(self):
559 return None
560 self.assertRaises(TypeError, R().readlines, 1)
561
Guido van Rossum28524c72007-02-27 05:47:44 +0000562 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000563 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000564 self.write_ops(f)
565 data = f.getvalue()
566 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000567 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000568 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000569
Guido van Rossum53807da2007-04-10 19:01:47 +0000570 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000571 # On Windows and Mac OSX this test comsumes large resources; It takes
572 # a long time to build the >2GB file and takes >2GB of disk space
573 # therefore the resource must be enabled to run this test.
574 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600575 support.requires(
576 'largefile',
577 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000578 with self.open(support.TESTFN, "w+b", 0) as f:
579 self.large_file_ops(f)
580 with self.open(support.TESTFN, "w+b") as f:
581 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000582
583 def test_with_open(self):
584 for bufsize in (0, 1, 100):
585 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000586 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000587 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000588 self.assertEqual(f.closed, True)
589 f = None
590 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000591 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000592 1/0
593 except ZeroDivisionError:
594 self.assertEqual(f.closed, True)
595 else:
596 self.fail("1/0 didn't raise an exception")
597
Antoine Pitrou08838b62009-01-21 00:55:13 +0000598 # issue 5008
599 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000600 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000601 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000602 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000603 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000604 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000605 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000606 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300607 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000608
Guido van Rossum87429772007-04-10 21:06:59 +0000609 def test_destructor(self):
610 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000611 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000612 def __del__(self):
613 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000614 try:
615 f = super().__del__
616 except AttributeError:
617 pass
618 else:
619 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000620 def close(self):
621 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000622 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000623 def flush(self):
624 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000625 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000626 with support.check_warnings(('', ResourceWarning)):
627 f = MyFileIO(support.TESTFN, "wb")
628 f.write(b"xxx")
629 del f
630 support.gc_collect()
631 self.assertEqual(record, [1, 2, 3])
632 with self.open(support.TESTFN, "rb") as f:
633 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000634
635 def _check_base_destructor(self, base):
636 record = []
637 class MyIO(base):
638 def __init__(self):
639 # This exercises the availability of attributes on object
640 # destruction.
641 # (in the C version, close() is called by the tp_dealloc
642 # function, not by __del__)
643 self.on_del = 1
644 self.on_close = 2
645 self.on_flush = 3
646 def __del__(self):
647 record.append(self.on_del)
648 try:
649 f = super().__del__
650 except AttributeError:
651 pass
652 else:
653 f()
654 def close(self):
655 record.append(self.on_close)
656 super().close()
657 def flush(self):
658 record.append(self.on_flush)
659 super().flush()
660 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000661 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000662 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000663 self.assertEqual(record, [1, 2, 3])
664
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000665 def test_IOBase_destructor(self):
666 self._check_base_destructor(self.IOBase)
667
668 def test_RawIOBase_destructor(self):
669 self._check_base_destructor(self.RawIOBase)
670
671 def test_BufferedIOBase_destructor(self):
672 self._check_base_destructor(self.BufferedIOBase)
673
674 def test_TextIOBase_destructor(self):
675 self._check_base_destructor(self.TextIOBase)
676
Guido van Rossum87429772007-04-10 21:06:59 +0000677 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000678 with self.open(support.TESTFN, "wb") as f:
679 f.write(b"xxx")
680 with self.open(support.TESTFN, "rb") as f:
681 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000682
Guido van Rossumd4103952007-04-12 05:44:49 +0000683 def test_array_writes(self):
684 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000685 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000686 def check(f):
687 with f:
688 self.assertEqual(f.write(a), n)
689 f.writelines((a,))
690 check(self.BytesIO())
691 check(self.FileIO(support.TESTFN, "w"))
692 check(self.BufferedWriter(self.MockRawIO()))
693 check(self.BufferedRandom(self.MockRawIO()))
694 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000695
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000696 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000697 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000698 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000699
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000700 def test_read_closed(self):
701 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000702 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000703 with self.open(support.TESTFN, "r") as f:
704 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000705 self.assertEqual(file.read(), "egg\n")
706 file.seek(0)
707 file.close()
708 self.assertRaises(ValueError, file.read)
709
710 def test_no_closefd_with_filename(self):
711 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000712 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000713
714 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000715 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000716 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000717 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000718 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000719 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000720 self.assertEqual(file.buffer.raw.closefd, False)
721
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000722 def test_garbage_collection(self):
723 # FileIO objects are collected, and collecting them flushes
724 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000725 with support.check_warnings(('', ResourceWarning)):
726 f = self.FileIO(support.TESTFN, "wb")
727 f.write(b"abcxxx")
728 f.f = f
729 wr = weakref.ref(f)
730 del f
731 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300732 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000733 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000734 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000735
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000736 def test_unbounded_file(self):
737 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
738 zero = "/dev/zero"
739 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000740 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000741 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000742 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000743 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000744 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000745 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000746 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000747 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000748 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000749 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000750 self.assertRaises(OverflowError, f.read)
751
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200752 def check_flush_error_on_close(self, *args, **kwargs):
753 # Test that the file is closed despite failed flush
754 # and that flush() is called before file closed.
755 f = self.open(*args, **kwargs)
756 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000757 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200758 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200759 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000760 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200761 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600762 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200763 self.assertTrue(closed) # flush() called
764 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200765 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200766
767 def test_flush_error_on_close(self):
768 # raw file
769 # Issue #5700: io.FileIO calls flush() after file closed
770 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
771 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
772 self.check_flush_error_on_close(fd, 'wb', buffering=0)
773 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
774 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
775 os.close(fd)
776 # buffered io
777 self.check_flush_error_on_close(support.TESTFN, 'wb')
778 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
779 self.check_flush_error_on_close(fd, 'wb')
780 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
781 self.check_flush_error_on_close(fd, 'wb', closefd=False)
782 os.close(fd)
783 # text io
784 self.check_flush_error_on_close(support.TESTFN, 'w')
785 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
786 self.check_flush_error_on_close(fd, 'w')
787 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
788 self.check_flush_error_on_close(fd, 'w', closefd=False)
789 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000790
791 def test_multi_close(self):
792 f = self.open(support.TESTFN, "wb", buffering=0)
793 f.close()
794 f.close()
795 f.close()
796 self.assertRaises(ValueError, f.flush)
797
Antoine Pitrou328ec742010-09-14 18:37:24 +0000798 def test_RawIOBase_read(self):
799 # Exercise the default RawIOBase.read() implementation (which calls
800 # readinto() internally).
801 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
802 self.assertEqual(rawio.read(2), b"ab")
803 self.assertEqual(rawio.read(2), b"c")
804 self.assertEqual(rawio.read(2), b"d")
805 self.assertEqual(rawio.read(2), None)
806 self.assertEqual(rawio.read(2), b"ef")
807 self.assertEqual(rawio.read(2), b"g")
808 self.assertEqual(rawio.read(2), None)
809 self.assertEqual(rawio.read(2), b"")
810
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400811 def test_types_have_dict(self):
812 test = (
813 self.IOBase(),
814 self.RawIOBase(),
815 self.TextIOBase(),
816 self.StringIO(),
817 self.BytesIO()
818 )
819 for obj in test:
820 self.assertTrue(hasattr(obj, "__dict__"))
821
Ross Lagerwall59142db2011-10-31 20:34:46 +0200822 def test_opener(self):
823 with self.open(support.TESTFN, "w") as f:
824 f.write("egg\n")
825 fd = os.open(support.TESTFN, os.O_RDONLY)
826 def opener(path, flags):
827 return fd
828 with self.open("non-existent", "r", opener=opener) as f:
829 self.assertEqual(f.read(), "egg\n")
830
Barry Warsaw480e2852016-06-08 17:47:26 -0400831 def test_bad_opener_negative_1(self):
832 # Issue #27066.
833 def badopener(fname, flags):
834 return -1
835 with self.assertRaises(ValueError) as cm:
836 open('non-existent', 'r', opener=badopener)
837 self.assertEqual(str(cm.exception), 'opener returned -1')
838
839 def test_bad_opener_other_negative(self):
840 # Issue #27066.
841 def badopener(fname, flags):
842 return -2
843 with self.assertRaises(ValueError) as cm:
844 open('non-existent', 'r', opener=badopener)
845 self.assertEqual(str(cm.exception), 'opener returned -2')
846
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200847 def test_fileio_closefd(self):
848 # Issue #4841
849 with self.open(__file__, 'rb') as f1, \
850 self.open(__file__, 'rb') as f2:
851 fileio = self.FileIO(f1.fileno(), closefd=False)
852 # .__init__() must not close f1
853 fileio.__init__(f2.fileno(), closefd=False)
854 f1.readline()
855 # .close() must not close f2
856 fileio.close()
857 f2.readline()
858
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300859 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200860 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300861 with self.assertRaises(ValueError):
862 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300863
864 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200865 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300866 with self.assertRaises(ValueError):
867 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300868
Martin Panter6bb91f32016-05-28 00:41:57 +0000869 def test_buffered_readinto_mixin(self):
870 # Test the implementation provided by BufferedIOBase
871 class Stream(self.BufferedIOBase):
872 def read(self, size):
873 return b"12345"
874 read1 = read
875 stream = Stream()
876 for method in ("readinto", "readinto1"):
877 with self.subTest(method):
878 buffer = byteslike(5)
879 self.assertEqual(getattr(stream, method)(buffer), 5)
880 self.assertEqual(bytes(buffer), b"12345")
881
Ethan Furmand62548a2016-06-04 14:38:43 -0700882 def test_fspath_support(self):
883 class PathLike:
884 def __init__(self, path):
885 self.path = path
886
887 def __fspath__(self):
888 return self.path
889
890 def check_path_succeeds(path):
891 with self.open(path, "w") as f:
892 f.write("egg\n")
893
894 with self.open(path, "r") as f:
895 self.assertEqual(f.read(), "egg\n")
896
897 check_path_succeeds(PathLike(support.TESTFN))
898 check_path_succeeds(PathLike(support.TESTFN.encode('utf-8')))
899
900 bad_path = PathLike(TypeError)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700901 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700902 self.open(bad_path, 'w')
903
904 # ensure that refcounting is correct with some error conditions
905 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
906 self.open(PathLike(support.TESTFN), 'rwxa')
907
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200908
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000909class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200910
911 def test_IOBase_finalize(self):
912 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
913 # class which inherits IOBase and an object of this class are caught
914 # in a reference cycle and close() is already in the method cache.
915 class MyIO(self.IOBase):
916 def close(self):
917 pass
918
919 # create an instance to populate the method cache
920 MyIO()
921 obj = MyIO()
922 obj.obj = obj
923 wr = weakref.ref(obj)
924 del MyIO
925 del obj
926 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300927 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000928
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000929class PyIOTest(IOTest):
930 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000931
Guido van Rossuma9e20242007-03-08 00:43:48 +0000932
Gregory P. Smith1bef9072015-04-14 13:24:34 -0700933@support.cpython_only
934class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -0700935
Gregory P. Smith054b0652015-04-14 12:58:05 -0700936 def test_RawIOBase_io_in_pyio_match(self):
937 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +0200938 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
939 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -0700940 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
941
942 def test_RawIOBase_pyio_in_io_match(self):
943 """Test that c RawIOBase class has all pyio RawIOBase methods"""
944 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
945 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
946
947
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000948class CommonBufferedTests:
949 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
950
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000951 def test_detach(self):
952 raw = self.MockRawIO()
953 buf = self.tp(raw)
954 self.assertIs(buf.detach(), raw)
955 self.assertRaises(ValueError, buf.detach)
956
Benjamin Peterson10e76b62014-12-21 20:51:50 -0600957 repr(buf) # Should still work
958
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000959 def test_fileno(self):
960 rawio = self.MockRawIO()
961 bufio = self.tp(rawio)
962
Ezio Melottib3aedd42010-11-20 19:04:17 +0000963 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000964
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000965 def test_invalid_args(self):
966 rawio = self.MockRawIO()
967 bufio = self.tp(rawio)
968 # Invalid whence
969 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200970 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000971
972 def test_override_destructor(self):
973 tp = self.tp
974 record = []
975 class MyBufferedIO(tp):
976 def __del__(self):
977 record.append(1)
978 try:
979 f = super().__del__
980 except AttributeError:
981 pass
982 else:
983 f()
984 def close(self):
985 record.append(2)
986 super().close()
987 def flush(self):
988 record.append(3)
989 super().flush()
990 rawio = self.MockRawIO()
991 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000992 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000993 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +0000994 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000995
996 def test_context_manager(self):
997 # Test usability as a context manager
998 rawio = self.MockRawIO()
999 bufio = self.tp(rawio)
1000 def _with():
1001 with bufio:
1002 pass
1003 _with()
1004 # bufio should now be closed, and using it a second time should raise
1005 # a ValueError.
1006 self.assertRaises(ValueError, _with)
1007
1008 def test_error_through_destructor(self):
1009 # Test that the exception state is not modified by a destructor,
1010 # even if close() fails.
1011 rawio = self.CloseFailureIO()
1012 def f():
1013 self.tp(rawio).xyzzy
1014 with support.captured_output("stderr") as s:
1015 self.assertRaises(AttributeError, f)
1016 s = s.getvalue().strip()
1017 if s:
1018 # The destructor *may* have printed an unraisable error, check it
1019 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001020 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001021 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +00001022
Antoine Pitrou716c4442009-05-23 19:04:03 +00001023 def test_repr(self):
1024 raw = self.MockRawIO()
1025 b = self.tp(raw)
Serhiy Storchaka521e5862014-07-22 15:00:37 +03001026 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001027 self.assertEqual(repr(b), "<%s>" % clsname)
1028 raw.name = "dummy"
1029 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
1030 raw.name = b"dummy"
1031 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
1032
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02001033 def test_recursive_repr(self):
1034 # Issue #25455
1035 raw = self.MockRawIO()
1036 b = self.tp(raw)
1037 with support.swap_attr(raw, 'name', b):
1038 try:
1039 repr(b) # Should not crash
1040 except RuntimeError:
1041 pass
1042
Antoine Pitrou6be88762010-05-03 16:48:20 +00001043 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001044 # Test that buffered file is closed despite failed flush
1045 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001046 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001047 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001048 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001049 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001050 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001051 raw.flush = bad_flush
1052 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001053 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001054 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001055 self.assertTrue(raw.closed)
1056 self.assertTrue(closed) # flush() called
1057 self.assertFalse(closed[0]) # flush() called before file closed
1058 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001059 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001060
1061 def test_close_error_on_close(self):
1062 raw = self.MockRawIO()
1063 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001064 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001065 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001066 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001067 raw.close = bad_close
1068 b = self.tp(raw)
1069 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001070 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001071 b.close()
1072 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001073 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001074 self.assertEqual(err.exception.__context__.args, ('flush',))
1075 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001076
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001077 def test_nonnormalized_close_error_on_close(self):
1078 # Issue #21677
1079 raw = self.MockRawIO()
1080 def bad_flush():
1081 raise non_existing_flush
1082 def bad_close():
1083 raise non_existing_close
1084 raw.close = bad_close
1085 b = self.tp(raw)
1086 b.flush = bad_flush
1087 with self.assertRaises(NameError) as err: # exception not swallowed
1088 b.close()
1089 self.assertIn('non_existing_close', str(err.exception))
1090 self.assertIsInstance(err.exception.__context__, NameError)
1091 self.assertIn('non_existing_flush', str(err.exception.__context__))
1092 self.assertFalse(b.closed)
1093
Antoine Pitrou6be88762010-05-03 16:48:20 +00001094 def test_multi_close(self):
1095 raw = self.MockRawIO()
1096 b = self.tp(raw)
1097 b.close()
1098 b.close()
1099 b.close()
1100 self.assertRaises(ValueError, b.flush)
1101
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001102 def test_unseekable(self):
1103 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1104 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1105 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1106
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001107 def test_readonly_attributes(self):
1108 raw = self.MockRawIO()
1109 buf = self.tp(raw)
1110 x = self.MockRawIO()
1111 with self.assertRaises(AttributeError):
1112 buf.raw = x
1113
Guido van Rossum78892e42007-04-06 17:31:18 +00001114
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001115class SizeofTest:
1116
1117 @support.cpython_only
1118 def test_sizeof(self):
1119 bufsize1 = 4096
1120 bufsize2 = 8192
1121 rawio = self.MockRawIO()
1122 bufio = self.tp(rawio, buffer_size=bufsize1)
1123 size = sys.getsizeof(bufio) - bufsize1
1124 rawio = self.MockRawIO()
1125 bufio = self.tp(rawio, buffer_size=bufsize2)
1126 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1127
Jesus Ceadc469452012-10-04 12:37:56 +02001128 @support.cpython_only
1129 def test_buffer_freeing(self) :
1130 bufsize = 4096
1131 rawio = self.MockRawIO()
1132 bufio = self.tp(rawio, buffer_size=bufsize)
1133 size = sys.getsizeof(bufio) - bufsize
1134 bufio.close()
1135 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001136
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001137class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1138 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001139
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001140 def test_constructor(self):
1141 rawio = self.MockRawIO([b"abc"])
1142 bufio = self.tp(rawio)
1143 bufio.__init__(rawio)
1144 bufio.__init__(rawio, buffer_size=1024)
1145 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001146 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001147 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1148 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1149 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1150 rawio = self.MockRawIO([b"abc"])
1151 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001152 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001153
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001154 def test_uninitialized(self):
1155 bufio = self.tp.__new__(self.tp)
1156 del bufio
1157 bufio = self.tp.__new__(self.tp)
1158 self.assertRaisesRegex((ValueError, AttributeError),
1159 'uninitialized|has no attribute',
1160 bufio.read, 0)
1161 bufio.__init__(self.MockRawIO())
1162 self.assertEqual(bufio.read(0), b'')
1163
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001164 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001165 for arg in (None, 7):
1166 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1167 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001168 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001169 # Invalid args
1170 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001171
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001172 def test_read1(self):
1173 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1174 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001175 self.assertEqual(b"a", bufio.read(1))
1176 self.assertEqual(b"b", bufio.read1(1))
1177 self.assertEqual(rawio._reads, 1)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001178 self.assertEqual(b"", bufio.read1(0))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001179 self.assertEqual(b"c", bufio.read1(100))
1180 self.assertEqual(rawio._reads, 1)
1181 self.assertEqual(b"d", bufio.read1(100))
1182 self.assertEqual(rawio._reads, 2)
1183 self.assertEqual(b"efg", bufio.read1(100))
1184 self.assertEqual(rawio._reads, 3)
1185 self.assertEqual(b"", bufio.read1(100))
1186 self.assertEqual(rawio._reads, 4)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001187
1188 def test_read1_arbitrary(self):
1189 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1190 bufio = self.tp(rawio)
1191 self.assertEqual(b"a", bufio.read(1))
1192 self.assertEqual(b"bc", bufio.read1())
1193 self.assertEqual(b"d", bufio.read1())
1194 self.assertEqual(b"efg", bufio.read1(-1))
1195 self.assertEqual(rawio._reads, 3)
1196 self.assertEqual(b"", bufio.read1())
1197 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001198
1199 def test_readinto(self):
1200 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1201 bufio = self.tp(rawio)
1202 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001203 self.assertEqual(bufio.readinto(b), 2)
1204 self.assertEqual(b, b"ab")
1205 self.assertEqual(bufio.readinto(b), 2)
1206 self.assertEqual(b, b"cd")
1207 self.assertEqual(bufio.readinto(b), 2)
1208 self.assertEqual(b, b"ef")
1209 self.assertEqual(bufio.readinto(b), 1)
1210 self.assertEqual(b, b"gf")
1211 self.assertEqual(bufio.readinto(b), 0)
1212 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001213 rawio = self.MockRawIO((b"abc", None))
1214 bufio = self.tp(rawio)
1215 self.assertEqual(bufio.readinto(b), 2)
1216 self.assertEqual(b, b"ab")
1217 self.assertEqual(bufio.readinto(b), 1)
1218 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001219
Benjamin Petersona96fea02014-06-22 14:17:44 -07001220 def test_readinto1(self):
1221 buffer_size = 10
1222 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1223 bufio = self.tp(rawio, buffer_size=buffer_size)
1224 b = bytearray(2)
1225 self.assertEqual(bufio.peek(3), b'abc')
1226 self.assertEqual(rawio._reads, 1)
1227 self.assertEqual(bufio.readinto1(b), 2)
1228 self.assertEqual(b, b"ab")
1229 self.assertEqual(rawio._reads, 1)
1230 self.assertEqual(bufio.readinto1(b), 1)
1231 self.assertEqual(b[:1], b"c")
1232 self.assertEqual(rawio._reads, 1)
1233 self.assertEqual(bufio.readinto1(b), 2)
1234 self.assertEqual(b, b"de")
1235 self.assertEqual(rawio._reads, 2)
1236 b = bytearray(2*buffer_size)
1237 self.assertEqual(bufio.peek(3), b'fgh')
1238 self.assertEqual(rawio._reads, 3)
1239 self.assertEqual(bufio.readinto1(b), 6)
1240 self.assertEqual(b[:6], b"fghjkl")
1241 self.assertEqual(rawio._reads, 4)
1242
1243 def test_readinto_array(self):
1244 buffer_size = 60
1245 data = b"a" * 26
1246 rawio = self.MockRawIO((data,))
1247 bufio = self.tp(rawio, buffer_size=buffer_size)
1248
1249 # Create an array with element size > 1 byte
1250 b = array.array('i', b'x' * 32)
1251 assert len(b) != 16
1252
1253 # Read into it. We should get as many *bytes* as we can fit into b
1254 # (which is more than the number of elements)
1255 n = bufio.readinto(b)
1256 self.assertGreater(n, len(b))
1257
1258 # Check that old contents of b are preserved
1259 bm = memoryview(b).cast('B')
1260 self.assertLess(n, len(bm))
1261 self.assertEqual(bm[:n], data[:n])
1262 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1263
1264 def test_readinto1_array(self):
1265 buffer_size = 60
1266 data = b"a" * 26
1267 rawio = self.MockRawIO((data,))
1268 bufio = self.tp(rawio, buffer_size=buffer_size)
1269
1270 # Create an array with element size > 1 byte
1271 b = array.array('i', b'x' * 32)
1272 assert len(b) != 16
1273
1274 # Read into it. We should get as many *bytes* as we can fit into b
1275 # (which is more than the number of elements)
1276 n = bufio.readinto1(b)
1277 self.assertGreater(n, len(b))
1278
1279 # Check that old contents of b are preserved
1280 bm = memoryview(b).cast('B')
1281 self.assertLess(n, len(bm))
1282 self.assertEqual(bm[:n], data[:n])
1283 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1284
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001285 def test_readlines(self):
1286 def bufio():
1287 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1288 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001289 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1290 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1291 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001292
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001293 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001294 data = b"abcdefghi"
1295 dlen = len(data)
1296
1297 tests = [
1298 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1299 [ 100, [ 3, 3, 3], [ dlen ] ],
1300 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1301 ]
1302
1303 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001304 rawio = self.MockFileIO(data)
1305 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001306 pos = 0
1307 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001308 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001309 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001310 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001311 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001312
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001313 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001314 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001315 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1316 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001317 self.assertEqual(b"abcd", bufio.read(6))
1318 self.assertEqual(b"e", bufio.read(1))
1319 self.assertEqual(b"fg", bufio.read())
1320 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001321 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001322 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001323
Victor Stinnera80987f2011-05-25 22:47:16 +02001324 rawio = self.MockRawIO((b"a", None, None))
1325 self.assertEqual(b"a", rawio.readall())
1326 self.assertIsNone(rawio.readall())
1327
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001328 def test_read_past_eof(self):
1329 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1330 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001331
Ezio Melottib3aedd42010-11-20 19:04:17 +00001332 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001333
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001334 def test_read_all(self):
1335 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1336 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001337
Ezio Melottib3aedd42010-11-20 19:04:17 +00001338 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001339
Victor Stinner45df8202010-04-28 22:31:17 +00001340 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001341 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001342 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001343 try:
1344 # Write out many bytes with exactly the same number of 0's,
1345 # 1's... 255's. This will help us check that concurrent reading
1346 # doesn't duplicate or forget contents.
1347 N = 1000
1348 l = list(range(256)) * N
1349 random.shuffle(l)
1350 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001351 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001352 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001353 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001354 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001355 errors = []
1356 results = []
1357 def f():
1358 try:
1359 # Intra-buffer read then buffer-flushing read
1360 for n in cycle([1, 19]):
1361 s = bufio.read(n)
1362 if not s:
1363 break
1364 # list.append() is atomic
1365 results.append(s)
1366 except Exception as e:
1367 errors.append(e)
1368 raise
1369 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001370 with support.start_threads(threads):
1371 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001372 self.assertFalse(errors,
1373 "the following exceptions were caught: %r" % errors)
1374 s = b''.join(results)
1375 for i in range(256):
1376 c = bytes(bytearray([i]))
1377 self.assertEqual(s.count(c), N)
1378 finally:
1379 support.unlink(support.TESTFN)
1380
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001381 def test_unseekable(self):
1382 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1383 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1384 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1385 bufio.read(1)
1386 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1387 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1388
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001389 def test_misbehaved_io(self):
1390 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1391 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001392 self.assertRaises(OSError, bufio.seek, 0)
1393 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001394
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001395 def test_no_extraneous_read(self):
1396 # Issue #9550; when the raw IO object has satisfied the read request,
1397 # we should not issue any additional reads, otherwise it may block
1398 # (e.g. socket).
1399 bufsize = 16
1400 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1401 rawio = self.MockRawIO([b"x" * n])
1402 bufio = self.tp(rawio, bufsize)
1403 self.assertEqual(bufio.read(n), b"x" * n)
1404 # Simple case: one raw read is enough to satisfy the request.
1405 self.assertEqual(rawio._extraneous_reads, 0,
1406 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1407 # A more complex case where two raw reads are needed to satisfy
1408 # the request.
1409 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1410 bufio = self.tp(rawio, bufsize)
1411 self.assertEqual(bufio.read(n), b"x" * n)
1412 self.assertEqual(rawio._extraneous_reads, 0,
1413 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1414
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001415 def test_read_on_closed(self):
1416 # Issue #23796
1417 b = io.BufferedReader(io.BytesIO(b"12"))
1418 b.read(1)
1419 b.close()
1420 self.assertRaises(ValueError, b.peek)
1421 self.assertRaises(ValueError, b.read1, 1)
1422
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001423
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001424class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001425 tp = io.BufferedReader
1426
1427 def test_constructor(self):
1428 BufferedReaderTest.test_constructor(self)
1429 # The allocation can succeed on 32-bit builds, e.g. with more
1430 # than 2GB RAM and a 64-bit kernel.
1431 if sys.maxsize > 0x7FFFFFFF:
1432 rawio = self.MockRawIO()
1433 bufio = self.tp(rawio)
1434 self.assertRaises((OverflowError, MemoryError, ValueError),
1435 bufio.__init__, rawio, sys.maxsize)
1436
1437 def test_initialization(self):
1438 rawio = self.MockRawIO([b"abc"])
1439 bufio = self.tp(rawio)
1440 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1441 self.assertRaises(ValueError, bufio.read)
1442 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1443 self.assertRaises(ValueError, bufio.read)
1444 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1445 self.assertRaises(ValueError, bufio.read)
1446
1447 def test_misbehaved_io_read(self):
1448 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1449 bufio = self.tp(rawio)
1450 # _pyio.BufferedReader seems to implement reading different, so that
1451 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001452 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001453
1454 def test_garbage_collection(self):
1455 # C BufferedReader objects are collected.
1456 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001457 with support.check_warnings(('', ResourceWarning)):
1458 rawio = self.FileIO(support.TESTFN, "w+b")
1459 f = self.tp(rawio)
1460 f.f = f
1461 wr = weakref.ref(f)
1462 del f
1463 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001464 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001465
R David Murray67bfe802013-02-23 21:51:05 -05001466 def test_args_error(self):
1467 # Issue #17275
1468 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1469 self.tp(io.BytesIO(), 1024, 1024, 1024)
1470
1471
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001472class PyBufferedReaderTest(BufferedReaderTest):
1473 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001474
Guido van Rossuma9e20242007-03-08 00:43:48 +00001475
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001476class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1477 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001478
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001479 def test_constructor(self):
1480 rawio = self.MockRawIO()
1481 bufio = self.tp(rawio)
1482 bufio.__init__(rawio)
1483 bufio.__init__(rawio, buffer_size=1024)
1484 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001485 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001486 bufio.flush()
1487 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1488 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1489 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1490 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001491 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001492 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001493 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001494
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001495 def test_uninitialized(self):
1496 bufio = self.tp.__new__(self.tp)
1497 del bufio
1498 bufio = self.tp.__new__(self.tp)
1499 self.assertRaisesRegex((ValueError, AttributeError),
1500 'uninitialized|has no attribute',
1501 bufio.write, b'')
1502 bufio.__init__(self.MockRawIO())
1503 self.assertEqual(bufio.write(b''), 0)
1504
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001505 def test_detach_flush(self):
1506 raw = self.MockRawIO()
1507 buf = self.tp(raw)
1508 buf.write(b"howdy!")
1509 self.assertFalse(raw._write_stack)
1510 buf.detach()
1511 self.assertEqual(raw._write_stack, [b"howdy!"])
1512
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001513 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001514 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001515 writer = self.MockRawIO()
1516 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001517 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001518 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001519 buffer = bytearray(b"def")
1520 bufio.write(buffer)
1521 buffer[:] = b"***" # Overwrite our copy of the data
1522 bufio.flush()
1523 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001524
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001525 def test_write_overflow(self):
1526 writer = self.MockRawIO()
1527 bufio = self.tp(writer, 8)
1528 contents = b"abcdefghijklmnop"
1529 for n in range(0, len(contents), 3):
1530 bufio.write(contents[n:n+3])
1531 flushed = b"".join(writer._write_stack)
1532 # At least (total - 8) bytes were implicitly flushed, perhaps more
1533 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001534 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001535
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001536 def check_writes(self, intermediate_func):
1537 # Lots of writes, test the flushed output is as expected.
1538 contents = bytes(range(256)) * 1000
1539 n = 0
1540 writer = self.MockRawIO()
1541 bufio = self.tp(writer, 13)
1542 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1543 def gen_sizes():
1544 for size in count(1):
1545 for i in range(15):
1546 yield size
1547 sizes = gen_sizes()
1548 while n < len(contents):
1549 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001550 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001551 intermediate_func(bufio)
1552 n += size
1553 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001554 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001555
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001556 def test_writes(self):
1557 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001558
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001559 def test_writes_and_flushes(self):
1560 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001561
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001562 def test_writes_and_seeks(self):
1563 def _seekabs(bufio):
1564 pos = bufio.tell()
1565 bufio.seek(pos + 1, 0)
1566 bufio.seek(pos - 1, 0)
1567 bufio.seek(pos, 0)
1568 self.check_writes(_seekabs)
1569 def _seekrel(bufio):
1570 pos = bufio.seek(0, 1)
1571 bufio.seek(+1, 1)
1572 bufio.seek(-1, 1)
1573 bufio.seek(pos, 0)
1574 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001575
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001576 def test_writes_and_truncates(self):
1577 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001578
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001579 def test_write_non_blocking(self):
1580 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001581 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001582
Ezio Melottib3aedd42010-11-20 19:04:17 +00001583 self.assertEqual(bufio.write(b"abcd"), 4)
1584 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001585 # 1 byte will be written, the rest will be buffered
1586 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001587 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001588
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001589 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1590 raw.block_on(b"0")
1591 try:
1592 bufio.write(b"opqrwxyz0123456789")
1593 except self.BlockingIOError as e:
1594 written = e.characters_written
1595 else:
1596 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001597 self.assertEqual(written, 16)
1598 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001599 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001600
Ezio Melottib3aedd42010-11-20 19:04:17 +00001601 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001602 s = raw.pop_written()
1603 # Previously buffered bytes were flushed
1604 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001605
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001606 def test_write_and_rewind(self):
1607 raw = io.BytesIO()
1608 bufio = self.tp(raw, 4)
1609 self.assertEqual(bufio.write(b"abcdef"), 6)
1610 self.assertEqual(bufio.tell(), 6)
1611 bufio.seek(0, 0)
1612 self.assertEqual(bufio.write(b"XY"), 2)
1613 bufio.seek(6, 0)
1614 self.assertEqual(raw.getvalue(), b"XYcdef")
1615 self.assertEqual(bufio.write(b"123456"), 6)
1616 bufio.flush()
1617 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001618
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001619 def test_flush(self):
1620 writer = self.MockRawIO()
1621 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001622 bufio.write(b"abc")
1623 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001624 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001625
Antoine Pitrou131a4892012-10-16 22:57:11 +02001626 def test_writelines(self):
1627 l = [b'ab', b'cd', b'ef']
1628 writer = self.MockRawIO()
1629 bufio = self.tp(writer, 8)
1630 bufio.writelines(l)
1631 bufio.flush()
1632 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1633
1634 def test_writelines_userlist(self):
1635 l = UserList([b'ab', b'cd', b'ef'])
1636 writer = self.MockRawIO()
1637 bufio = self.tp(writer, 8)
1638 bufio.writelines(l)
1639 bufio.flush()
1640 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1641
1642 def test_writelines_error(self):
1643 writer = self.MockRawIO()
1644 bufio = self.tp(writer, 8)
1645 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1646 self.assertRaises(TypeError, bufio.writelines, None)
1647 self.assertRaises(TypeError, bufio.writelines, 'abc')
1648
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001649 def test_destructor(self):
1650 writer = self.MockRawIO()
1651 bufio = self.tp(writer, 8)
1652 bufio.write(b"abc")
1653 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001654 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001655 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001656
1657 def test_truncate(self):
1658 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001659 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001660 bufio = self.tp(raw, 8)
1661 bufio.write(b"abcdef")
1662 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001663 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001664 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001665 self.assertEqual(f.read(), b"abc")
1666
Victor Stinner45df8202010-04-28 22:31:17 +00001667 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001668 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001669 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001670 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001671 # Write out many bytes from many threads and test they were
1672 # all flushed.
1673 N = 1000
1674 contents = bytes(range(256)) * N
1675 sizes = cycle([1, 19])
1676 n = 0
1677 queue = deque()
1678 while n < len(contents):
1679 size = next(sizes)
1680 queue.append(contents[n:n+size])
1681 n += size
1682 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001683 # We use a real file object because it allows us to
1684 # exercise situations where the GIL is released before
1685 # writing the buffer to the raw streams. This is in addition
1686 # to concurrency issues due to switching threads in the middle
1687 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001688 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001689 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001690 errors = []
1691 def f():
1692 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001693 while True:
1694 try:
1695 s = queue.popleft()
1696 except IndexError:
1697 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001698 bufio.write(s)
1699 except Exception as e:
1700 errors.append(e)
1701 raise
1702 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001703 with support.start_threads(threads):
1704 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001705 self.assertFalse(errors,
1706 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001707 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001708 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001709 s = f.read()
1710 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001711 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001712 finally:
1713 support.unlink(support.TESTFN)
1714
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001715 def test_misbehaved_io(self):
1716 rawio = self.MisbehavedRawIO()
1717 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001718 self.assertRaises(OSError, bufio.seek, 0)
1719 self.assertRaises(OSError, bufio.tell)
1720 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001721
Florent Xicluna109d5732012-07-07 17:03:22 +02001722 def test_max_buffer_size_removal(self):
1723 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001724 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001725
Benjamin Peterson68623612012-12-20 11:53:11 -06001726 def test_write_error_on_close(self):
1727 raw = self.MockRawIO()
1728 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001729 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001730 raw.write = bad_write
1731 b = self.tp(raw)
1732 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001733 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001734 self.assertTrue(b.closed)
1735
Benjamin Peterson59406a92009-03-26 17:10:29 +00001736
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001737class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001738 tp = io.BufferedWriter
1739
1740 def test_constructor(self):
1741 BufferedWriterTest.test_constructor(self)
1742 # The allocation can succeed on 32-bit builds, e.g. with more
1743 # than 2GB RAM and a 64-bit kernel.
1744 if sys.maxsize > 0x7FFFFFFF:
1745 rawio = self.MockRawIO()
1746 bufio = self.tp(rawio)
1747 self.assertRaises((OverflowError, MemoryError, ValueError),
1748 bufio.__init__, rawio, sys.maxsize)
1749
1750 def test_initialization(self):
1751 rawio = self.MockRawIO()
1752 bufio = self.tp(rawio)
1753 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1754 self.assertRaises(ValueError, bufio.write, b"def")
1755 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1756 self.assertRaises(ValueError, bufio.write, b"def")
1757 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1758 self.assertRaises(ValueError, bufio.write, b"def")
1759
1760 def test_garbage_collection(self):
1761 # C BufferedWriter objects are collected, and collecting them flushes
1762 # all data to disk.
1763 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001764 with support.check_warnings(('', ResourceWarning)):
1765 rawio = self.FileIO(support.TESTFN, "w+b")
1766 f = self.tp(rawio)
1767 f.write(b"123xxx")
1768 f.x = f
1769 wr = weakref.ref(f)
1770 del f
1771 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001772 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001773 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001774 self.assertEqual(f.read(), b"123xxx")
1775
R David Murray67bfe802013-02-23 21:51:05 -05001776 def test_args_error(self):
1777 # Issue #17275
1778 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1779 self.tp(io.BytesIO(), 1024, 1024, 1024)
1780
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001781
1782class PyBufferedWriterTest(BufferedWriterTest):
1783 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001784
Guido van Rossum01a27522007-03-07 01:00:12 +00001785class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001786
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001787 def test_constructor(self):
1788 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001789 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001790
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001791 def test_uninitialized(self):
1792 pair = self.tp.__new__(self.tp)
1793 del pair
1794 pair = self.tp.__new__(self.tp)
1795 self.assertRaisesRegex((ValueError, AttributeError),
1796 'uninitialized|has no attribute',
1797 pair.read, 0)
1798 self.assertRaisesRegex((ValueError, AttributeError),
1799 'uninitialized|has no attribute',
1800 pair.write, b'')
1801 pair.__init__(self.MockRawIO(), self.MockRawIO())
1802 self.assertEqual(pair.read(0), b'')
1803 self.assertEqual(pair.write(b''), 0)
1804
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001805 def test_detach(self):
1806 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1807 self.assertRaises(self.UnsupportedOperation, pair.detach)
1808
Florent Xicluna109d5732012-07-07 17:03:22 +02001809 def test_constructor_max_buffer_size_removal(self):
1810 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001811 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001812
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001813 def test_constructor_with_not_readable(self):
1814 class NotReadable(MockRawIO):
1815 def readable(self):
1816 return False
1817
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001818 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001819
1820 def test_constructor_with_not_writeable(self):
1821 class NotWriteable(MockRawIO):
1822 def writable(self):
1823 return False
1824
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001825 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001826
1827 def test_read(self):
1828 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1829
1830 self.assertEqual(pair.read(3), b"abc")
1831 self.assertEqual(pair.read(1), b"d")
1832 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001833 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1834 self.assertEqual(pair.read(None), b"abc")
1835
1836 def test_readlines(self):
1837 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1838 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1839 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1840 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001841
1842 def test_read1(self):
1843 # .read1() is delegated to the underlying reader object, so this test
1844 # can be shallow.
1845 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1846
1847 self.assertEqual(pair.read1(3), b"abc")
Martin Panterccb2c0e2016-10-20 23:48:14 +00001848 self.assertEqual(pair.read1(), b"def")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001849
1850 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00001851 for method in ("readinto", "readinto1"):
1852 with self.subTest(method):
1853 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001854
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03001855 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00001856 self.assertEqual(getattr(pair, method)(data), 5)
1857 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001858
1859 def test_write(self):
1860 w = self.MockRawIO()
1861 pair = self.tp(self.MockRawIO(), w)
1862
1863 pair.write(b"abc")
1864 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00001865 buffer = bytearray(b"def")
1866 pair.write(buffer)
1867 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001868 pair.flush()
1869 self.assertEqual(w._write_stack, [b"abc", b"def"])
1870
1871 def test_peek(self):
1872 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1873
1874 self.assertTrue(pair.peek(3).startswith(b"abc"))
1875 self.assertEqual(pair.read(3), b"abc")
1876
1877 def test_readable(self):
1878 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1879 self.assertTrue(pair.readable())
1880
1881 def test_writeable(self):
1882 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1883 self.assertTrue(pair.writable())
1884
1885 def test_seekable(self):
1886 # BufferedRWPairs are never seekable, even if their readers and writers
1887 # are.
1888 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1889 self.assertFalse(pair.seekable())
1890
1891 # .flush() is delegated to the underlying writer object and has been
1892 # tested in the test_write method.
1893
1894 def test_close_and_closed(self):
1895 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1896 self.assertFalse(pair.closed)
1897 pair.close()
1898 self.assertTrue(pair.closed)
1899
Serhiy Storchaka7665be62015-03-24 23:21:57 +02001900 def test_reader_close_error_on_close(self):
1901 def reader_close():
1902 reader_non_existing
1903 reader = self.MockRawIO()
1904 reader.close = reader_close
1905 writer = self.MockRawIO()
1906 pair = self.tp(reader, writer)
1907 with self.assertRaises(NameError) as err:
1908 pair.close()
1909 self.assertIn('reader_non_existing', str(err.exception))
1910 self.assertTrue(pair.closed)
1911 self.assertFalse(reader.closed)
1912 self.assertTrue(writer.closed)
1913
1914 def test_writer_close_error_on_close(self):
1915 def writer_close():
1916 writer_non_existing
1917 reader = self.MockRawIO()
1918 writer = self.MockRawIO()
1919 writer.close = writer_close
1920 pair = self.tp(reader, writer)
1921 with self.assertRaises(NameError) as err:
1922 pair.close()
1923 self.assertIn('writer_non_existing', str(err.exception))
1924 self.assertFalse(pair.closed)
1925 self.assertTrue(reader.closed)
1926 self.assertFalse(writer.closed)
1927
1928 def test_reader_writer_close_error_on_close(self):
1929 def reader_close():
1930 reader_non_existing
1931 def writer_close():
1932 writer_non_existing
1933 reader = self.MockRawIO()
1934 reader.close = reader_close
1935 writer = self.MockRawIO()
1936 writer.close = writer_close
1937 pair = self.tp(reader, writer)
1938 with self.assertRaises(NameError) as err:
1939 pair.close()
1940 self.assertIn('reader_non_existing', str(err.exception))
1941 self.assertIsInstance(err.exception.__context__, NameError)
1942 self.assertIn('writer_non_existing', str(err.exception.__context__))
1943 self.assertFalse(pair.closed)
1944 self.assertFalse(reader.closed)
1945 self.assertFalse(writer.closed)
1946
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001947 def test_isatty(self):
1948 class SelectableIsAtty(MockRawIO):
1949 def __init__(self, isatty):
1950 MockRawIO.__init__(self)
1951 self._isatty = isatty
1952
1953 def isatty(self):
1954 return self._isatty
1955
1956 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1957 self.assertFalse(pair.isatty())
1958
1959 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1960 self.assertTrue(pair.isatty())
1961
1962 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1963 self.assertTrue(pair.isatty())
1964
1965 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1966 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001967
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04001968 def test_weakref_clearing(self):
1969 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1970 ref = weakref.ref(brw)
1971 brw = None
1972 ref = None # Shouldn't segfault.
1973
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001974class CBufferedRWPairTest(BufferedRWPairTest):
1975 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001976
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001977class PyBufferedRWPairTest(BufferedRWPairTest):
1978 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001979
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001980
1981class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1982 read_mode = "rb+"
1983 write_mode = "wb+"
1984
1985 def test_constructor(self):
1986 BufferedReaderTest.test_constructor(self)
1987 BufferedWriterTest.test_constructor(self)
1988
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001989 def test_uninitialized(self):
1990 BufferedReaderTest.test_uninitialized(self)
1991 BufferedWriterTest.test_uninitialized(self)
1992
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001993 def test_read_and_write(self):
1994 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001995 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001996
1997 self.assertEqual(b"as", rw.read(2))
1998 rw.write(b"ddd")
1999 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002000 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002001 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002002 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00002003
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002004 def test_seek_and_tell(self):
2005 raw = self.BytesIO(b"asdfghjkl")
2006 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00002007
Ezio Melottib3aedd42010-11-20 19:04:17 +00002008 self.assertEqual(b"as", rw.read(2))
2009 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002010 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002011 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00002012
Antoine Pitroue05565e2011-08-20 14:39:23 +02002013 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00002014 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02002015 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002016 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002017 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002018 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002019 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002020 self.assertEqual(7, rw.tell())
2021 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02002022 rw.flush()
2023 self.assertEqual(b"asdf123fl", raw.getvalue())
2024
Christian Heimes8e42a0a2007-11-08 18:04:45 +00002025 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002026
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002027 def check_flush_and_read(self, read_func):
2028 raw = self.BytesIO(b"abcdefghi")
2029 bufio = self.tp(raw)
2030
Ezio Melottib3aedd42010-11-20 19:04:17 +00002031 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002032 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002033 self.assertEqual(b"ef", read_func(bufio, 2))
2034 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002035 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002036 self.assertEqual(6, bufio.tell())
2037 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002038 raw.seek(0, 0)
2039 raw.write(b"XYZ")
2040 # flush() resets the read buffer
2041 bufio.flush()
2042 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002043 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002044
2045 def test_flush_and_read(self):
2046 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2047
2048 def test_flush_and_readinto(self):
2049 def _readinto(bufio, n=-1):
2050 b = bytearray(n if n >= 0 else 9999)
2051 n = bufio.readinto(b)
2052 return bytes(b[:n])
2053 self.check_flush_and_read(_readinto)
2054
2055 def test_flush_and_peek(self):
2056 def _peek(bufio, n=-1):
2057 # This relies on the fact that the buffer can contain the whole
2058 # raw stream, otherwise peek() can return less.
2059 b = bufio.peek(n)
2060 if n != -1:
2061 b = b[:n]
2062 bufio.seek(len(b), 1)
2063 return b
2064 self.check_flush_and_read(_peek)
2065
2066 def test_flush_and_write(self):
2067 raw = self.BytesIO(b"abcdefghi")
2068 bufio = self.tp(raw)
2069
2070 bufio.write(b"123")
2071 bufio.flush()
2072 bufio.write(b"45")
2073 bufio.flush()
2074 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002075 self.assertEqual(b"12345fghi", raw.getvalue())
2076 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002077
2078 def test_threads(self):
2079 BufferedReaderTest.test_threads(self)
2080 BufferedWriterTest.test_threads(self)
2081
2082 def test_writes_and_peek(self):
2083 def _peek(bufio):
2084 bufio.peek(1)
2085 self.check_writes(_peek)
2086 def _peek(bufio):
2087 pos = bufio.tell()
2088 bufio.seek(-1, 1)
2089 bufio.peek(1)
2090 bufio.seek(pos, 0)
2091 self.check_writes(_peek)
2092
2093 def test_writes_and_reads(self):
2094 def _read(bufio):
2095 bufio.seek(-1, 1)
2096 bufio.read(1)
2097 self.check_writes(_read)
2098
2099 def test_writes_and_read1s(self):
2100 def _read1(bufio):
2101 bufio.seek(-1, 1)
2102 bufio.read1(1)
2103 self.check_writes(_read1)
2104
2105 def test_writes_and_readintos(self):
2106 def _read(bufio):
2107 bufio.seek(-1, 1)
2108 bufio.readinto(bytearray(1))
2109 self.check_writes(_read)
2110
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002111 def test_write_after_readahead(self):
2112 # Issue #6629: writing after the buffer was filled by readahead should
2113 # first rewind the raw stream.
2114 for overwrite_size in [1, 5]:
2115 raw = self.BytesIO(b"A" * 10)
2116 bufio = self.tp(raw, 4)
2117 # Trigger readahead
2118 self.assertEqual(bufio.read(1), b"A")
2119 self.assertEqual(bufio.tell(), 1)
2120 # Overwriting should rewind the raw stream if it needs so
2121 bufio.write(b"B" * overwrite_size)
2122 self.assertEqual(bufio.tell(), overwrite_size + 1)
2123 # If the write size was smaller than the buffer size, flush() and
2124 # check that rewind happens.
2125 bufio.flush()
2126 self.assertEqual(bufio.tell(), overwrite_size + 1)
2127 s = raw.getvalue()
2128 self.assertEqual(s,
2129 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2130
Antoine Pitrou7c404892011-05-13 00:13:33 +02002131 def test_write_rewind_write(self):
2132 # Various combinations of reading / writing / seeking backwards / writing again
2133 def mutate(bufio, pos1, pos2):
2134 assert pos2 >= pos1
2135 # Fill the buffer
2136 bufio.seek(pos1)
2137 bufio.read(pos2 - pos1)
2138 bufio.write(b'\x02')
2139 # This writes earlier than the previous write, but still inside
2140 # the buffer.
2141 bufio.seek(pos1)
2142 bufio.write(b'\x01')
2143
2144 b = b"\x80\x81\x82\x83\x84"
2145 for i in range(0, len(b)):
2146 for j in range(i, len(b)):
2147 raw = self.BytesIO(b)
2148 bufio = self.tp(raw, 100)
2149 mutate(bufio, i, j)
2150 bufio.flush()
2151 expected = bytearray(b)
2152 expected[j] = 2
2153 expected[i] = 1
2154 self.assertEqual(raw.getvalue(), expected,
2155 "failed result for i=%d, j=%d" % (i, j))
2156
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002157 def test_truncate_after_read_or_write(self):
2158 raw = self.BytesIO(b"A" * 10)
2159 bufio = self.tp(raw, 100)
2160 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2161 self.assertEqual(bufio.truncate(), 2)
2162 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2163 self.assertEqual(bufio.truncate(), 4)
2164
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002165 def test_misbehaved_io(self):
2166 BufferedReaderTest.test_misbehaved_io(self)
2167 BufferedWriterTest.test_misbehaved_io(self)
2168
Antoine Pitroue05565e2011-08-20 14:39:23 +02002169 def test_interleaved_read_write(self):
2170 # Test for issue #12213
2171 with self.BytesIO(b'abcdefgh') as raw:
2172 with self.tp(raw, 100) as f:
2173 f.write(b"1")
2174 self.assertEqual(f.read(1), b'b')
2175 f.write(b'2')
2176 self.assertEqual(f.read1(1), b'd')
2177 f.write(b'3')
2178 buf = bytearray(1)
2179 f.readinto(buf)
2180 self.assertEqual(buf, b'f')
2181 f.write(b'4')
2182 self.assertEqual(f.peek(1), b'h')
2183 f.flush()
2184 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2185
2186 with self.BytesIO(b'abc') as raw:
2187 with self.tp(raw, 100) as f:
2188 self.assertEqual(f.read(1), b'a')
2189 f.write(b"2")
2190 self.assertEqual(f.read(1), b'c')
2191 f.flush()
2192 self.assertEqual(raw.getvalue(), b'a2c')
2193
2194 def test_interleaved_readline_write(self):
2195 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2196 with self.tp(raw) as f:
2197 f.write(b'1')
2198 self.assertEqual(f.readline(), b'b\n')
2199 f.write(b'2')
2200 self.assertEqual(f.readline(), b'def\n')
2201 f.write(b'3')
2202 self.assertEqual(f.readline(), b'\n')
2203 f.flush()
2204 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2205
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002206 # You can't construct a BufferedRandom over a non-seekable stream.
2207 test_unseekable = None
2208
R David Murray67bfe802013-02-23 21:51:05 -05002209
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002210class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002211 tp = io.BufferedRandom
2212
2213 def test_constructor(self):
2214 BufferedRandomTest.test_constructor(self)
2215 # The allocation can succeed on 32-bit builds, e.g. with more
2216 # than 2GB RAM and a 64-bit kernel.
2217 if sys.maxsize > 0x7FFFFFFF:
2218 rawio = self.MockRawIO()
2219 bufio = self.tp(rawio)
2220 self.assertRaises((OverflowError, MemoryError, ValueError),
2221 bufio.__init__, rawio, sys.maxsize)
2222
2223 def test_garbage_collection(self):
2224 CBufferedReaderTest.test_garbage_collection(self)
2225 CBufferedWriterTest.test_garbage_collection(self)
2226
R David Murray67bfe802013-02-23 21:51:05 -05002227 def test_args_error(self):
2228 # Issue #17275
2229 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2230 self.tp(io.BytesIO(), 1024, 1024, 1024)
2231
2232
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002233class PyBufferedRandomTest(BufferedRandomTest):
2234 tp = pyio.BufferedRandom
2235
2236
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002237# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2238# properties:
2239# - A single output character can correspond to many bytes of input.
2240# - The number of input bytes to complete the character can be
2241# undetermined until the last input byte is received.
2242# - The number of input bytes can vary depending on previous input.
2243# - A single input byte can correspond to many characters of output.
2244# - The number of output characters can be undetermined until the
2245# last input byte is received.
2246# - The number of output characters can vary depending on previous input.
2247
2248class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2249 """
2250 For testing seek/tell behavior with a stateful, buffering decoder.
2251
2252 Input is a sequence of words. Words may be fixed-length (length set
2253 by input) or variable-length (period-terminated). In variable-length
2254 mode, extra periods are ignored. Possible words are:
2255 - 'i' followed by a number sets the input length, I (maximum 99).
2256 When I is set to 0, words are space-terminated.
2257 - 'o' followed by a number sets the output length, O (maximum 99).
2258 - Any other word is converted into a word followed by a period on
2259 the output. The output word consists of the input word truncated
2260 or padded out with hyphens to make its length equal to O. If O
2261 is 0, the word is output verbatim without truncating or padding.
2262 I and O are initially set to 1. When I changes, any buffered input is
2263 re-scanned according to the new I. EOF also terminates the last word.
2264 """
2265
2266 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002267 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002268 self.reset()
2269
2270 def __repr__(self):
2271 return '<SID %x>' % id(self)
2272
2273 def reset(self):
2274 self.i = 1
2275 self.o = 1
2276 self.buffer = bytearray()
2277
2278 def getstate(self):
2279 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2280 return bytes(self.buffer), i*100 + o
2281
2282 def setstate(self, state):
2283 buffer, io = state
2284 self.buffer = bytearray(buffer)
2285 i, o = divmod(io, 100)
2286 self.i, self.o = i ^ 1, o ^ 1
2287
2288 def decode(self, input, final=False):
2289 output = ''
2290 for b in input:
2291 if self.i == 0: # variable-length, terminated with period
2292 if b == ord('.'):
2293 if self.buffer:
2294 output += self.process_word()
2295 else:
2296 self.buffer.append(b)
2297 else: # fixed-length, terminate after self.i bytes
2298 self.buffer.append(b)
2299 if len(self.buffer) == self.i:
2300 output += self.process_word()
2301 if final and self.buffer: # EOF terminates the last word
2302 output += self.process_word()
2303 return output
2304
2305 def process_word(self):
2306 output = ''
2307 if self.buffer[0] == ord('i'):
2308 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2309 elif self.buffer[0] == ord('o'):
2310 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2311 else:
2312 output = self.buffer.decode('ascii')
2313 if len(output) < self.o:
2314 output += '-'*self.o # pad out with hyphens
2315 if self.o:
2316 output = output[:self.o] # truncate to output length
2317 output += '.'
2318 self.buffer = bytearray()
2319 return output
2320
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002321 codecEnabled = False
2322
2323 @classmethod
2324 def lookupTestDecoder(cls, name):
2325 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002326 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002327 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002328 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002329 incrementalencoder=None,
2330 streamreader=None, streamwriter=None,
2331 incrementaldecoder=cls)
2332
2333# Register the previous decoder for testing.
2334# Disabled by default, tests will enable it.
2335codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2336
2337
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002338class StatefulIncrementalDecoderTest(unittest.TestCase):
2339 """
2340 Make sure the StatefulIncrementalDecoder actually works.
2341 """
2342
2343 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002344 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002345 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002346 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002347 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002348 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002349 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002350 # I=0, O=6 (variable-length input, fixed-length output)
2351 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2352 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002353 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002354 # I=6, O=3 (fixed-length input > fixed-length output)
2355 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2356 # I=0, then 3; O=29, then 15 (with longer output)
2357 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2358 'a----------------------------.' +
2359 'b----------------------------.' +
2360 'cde--------------------------.' +
2361 'abcdefghijabcde.' +
2362 'a.b------------.' +
2363 '.c.------------.' +
2364 'd.e------------.' +
2365 'k--------------.' +
2366 'l--------------.' +
2367 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002368 ]
2369
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002370 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002371 # Try a few one-shot test cases.
2372 for input, eof, output in self.test_cases:
2373 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002374 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002375
2376 # Also test an unfinished decode, followed by forcing EOF.
2377 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002378 self.assertEqual(d.decode(b'oiabcd'), '')
2379 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002380
2381class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002382
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002383 def setUp(self):
2384 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2385 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002386 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002387
Guido van Rossumd0712812007-04-11 16:32:43 +00002388 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002389 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002390
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002391 def test_constructor(self):
2392 r = self.BytesIO(b"\xc3\xa9\n\n")
2393 b = self.BufferedReader(r, 1000)
2394 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002395 t.__init__(b, encoding="latin-1", newline="\r\n")
2396 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002397 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002398 t.__init__(b, encoding="utf-8", line_buffering=True)
2399 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002400 self.assertEqual(t.line_buffering, True)
2401 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002402 self.assertRaises(TypeError, t.__init__, b, newline=42)
2403 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2404
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002405 def test_uninitialized(self):
2406 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2407 del t
2408 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2409 self.assertRaises(Exception, repr, t)
2410 self.assertRaisesRegex((ValueError, AttributeError),
2411 'uninitialized|has no attribute',
2412 t.read, 0)
2413 t.__init__(self.MockRawIO())
2414 self.assertEqual(t.read(0), '')
2415
Nick Coghlana9b15242014-02-04 22:11:18 +10002416 def test_non_text_encoding_codecs_are_rejected(self):
2417 # Ensure the constructor complains if passed a codec that isn't
2418 # marked as a text encoding
2419 # http://bugs.python.org/issue20404
2420 r = self.BytesIO()
2421 b = self.BufferedWriter(r)
2422 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2423 self.TextIOWrapper(b, encoding="hex")
2424
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002425 def test_detach(self):
2426 r = self.BytesIO()
2427 b = self.BufferedWriter(r)
2428 t = self.TextIOWrapper(b)
2429 self.assertIs(t.detach(), b)
2430
2431 t = self.TextIOWrapper(b, encoding="ascii")
2432 t.write("howdy")
2433 self.assertFalse(r.getvalue())
2434 t.detach()
2435 self.assertEqual(r.getvalue(), b"howdy")
2436 self.assertRaises(ValueError, t.detach)
2437
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002438 # Operations independent of the detached stream should still work
2439 repr(t)
2440 self.assertEqual(t.encoding, "ascii")
2441 self.assertEqual(t.errors, "strict")
2442 self.assertFalse(t.line_buffering)
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002443 self.assertFalse(t.write_through)
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002444
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002445 def test_repr(self):
2446 raw = self.BytesIO("hello".encode("utf-8"))
2447 b = self.BufferedReader(raw)
2448 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002449 modname = self.TextIOWrapper.__module__
2450 self.assertEqual(repr(t),
2451 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2452 raw.name = "dummy"
2453 self.assertEqual(repr(t),
2454 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002455 t.mode = "r"
2456 self.assertEqual(repr(t),
2457 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002458 raw.name = b"dummy"
2459 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002460 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002461
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002462 t.buffer.detach()
2463 repr(t) # Should not raise an exception
2464
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002465 def test_recursive_repr(self):
2466 # Issue #25455
2467 raw = self.BytesIO()
2468 t = self.TextIOWrapper(raw)
2469 with support.swap_attr(raw, 'name', t):
2470 try:
2471 repr(t) # Should not crash
2472 except RuntimeError:
2473 pass
2474
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002475 def test_line_buffering(self):
2476 r = self.BytesIO()
2477 b = self.BufferedWriter(r, 1000)
2478 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002479 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002480 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002481 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002482 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002483 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002484 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002485
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002486 def test_reconfigure_line_buffering(self):
2487 r = self.BytesIO()
2488 b = self.BufferedWriter(r, 1000)
2489 t = self.TextIOWrapper(b, newline="\n", line_buffering=False)
2490 t.write("AB\nC")
2491 self.assertEqual(r.getvalue(), b"")
2492
2493 t.reconfigure(line_buffering=True) # implicit flush
2494 self.assertEqual(r.getvalue(), b"AB\nC")
2495 t.write("DEF\nG")
2496 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2497 t.write("H")
2498 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2499 t.reconfigure(line_buffering=False) # implicit flush
2500 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2501 t.write("IJ")
2502 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2503
2504 # Keeping default value
2505 t.reconfigure()
2506 t.reconfigure(line_buffering=None)
2507 self.assertEqual(t.line_buffering, False)
2508 t.reconfigure(line_buffering=True)
2509 t.reconfigure()
2510 t.reconfigure(line_buffering=None)
2511 self.assertEqual(t.line_buffering, True)
2512
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002513 def test_default_encoding(self):
2514 old_environ = dict(os.environ)
2515 try:
2516 # try to get a user preferred encoding different than the current
2517 # locale encoding to check that TextIOWrapper() uses the current
2518 # locale encoding and not the user preferred encoding
2519 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2520 if key in os.environ:
2521 del os.environ[key]
2522
2523 current_locale_encoding = locale.getpreferredencoding(False)
2524 b = self.BytesIO()
2525 t = self.TextIOWrapper(b)
2526 self.assertEqual(t.encoding, current_locale_encoding)
2527 finally:
2528 os.environ.clear()
2529 os.environ.update(old_environ)
2530
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002531 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002532 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002533 # Issue 15989
2534 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002535 b = self.BytesIO()
2536 b.fileno = lambda: _testcapi.INT_MAX + 1
2537 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2538 b.fileno = lambda: _testcapi.UINT_MAX + 1
2539 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2540
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002541 def test_encoding(self):
2542 # Check the encoding attribute is always set, and valid
2543 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002544 t = self.TextIOWrapper(b, encoding="utf-8")
2545 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002546 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002547 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002548 codecs.lookup(t.encoding)
2549
2550 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002551 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002552 b = self.BytesIO(b"abc\n\xff\n")
2553 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002554 self.assertRaises(UnicodeError, t.read)
2555 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002556 b = self.BytesIO(b"abc\n\xff\n")
2557 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002558 self.assertRaises(UnicodeError, t.read)
2559 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002560 b = self.BytesIO(b"abc\n\xff\n")
2561 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002562 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002563 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002564 b = self.BytesIO(b"abc\n\xff\n")
2565 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002566 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002567
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002568 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002569 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002570 b = self.BytesIO()
2571 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002572 self.assertRaises(UnicodeError, t.write, "\xff")
2573 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002574 b = self.BytesIO()
2575 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002576 self.assertRaises(UnicodeError, t.write, "\xff")
2577 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002578 b = self.BytesIO()
2579 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002580 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002581 t.write("abc\xffdef\n")
2582 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002583 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002584 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002585 b = self.BytesIO()
2586 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002587 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002588 t.write("abc\xffdef\n")
2589 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002590 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002591
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002592 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002593 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2594
2595 tests = [
2596 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002597 [ '', input_lines ],
2598 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2599 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2600 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002601 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002602 encodings = (
2603 'utf-8', 'latin-1',
2604 'utf-16', 'utf-16-le', 'utf-16-be',
2605 'utf-32', 'utf-32-le', 'utf-32-be',
2606 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002607
Guido van Rossum8358db22007-08-18 21:39:55 +00002608 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002609 # character in TextIOWrapper._pending_line.
2610 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002611 # XXX: str.encode() should return bytes
2612 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002613 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002614 for bufsize in range(1, 10):
2615 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002616 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2617 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002618 encoding=encoding)
2619 if do_reads:
2620 got_lines = []
2621 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002622 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002623 if c2 == '':
2624 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002625 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002626 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002627 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002628 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002629
2630 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002631 self.assertEqual(got_line, exp_line)
2632 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002633
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002634 def test_newlines_input(self):
2635 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002636 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2637 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002638 (None, normalized.decode("ascii").splitlines(keepends=True)),
2639 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002640 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2641 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2642 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002643 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002644 buf = self.BytesIO(testdata)
2645 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002646 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002647 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002648 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002649
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002650 def test_newlines_output(self):
2651 testdict = {
2652 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2653 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2654 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2655 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2656 }
2657 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2658 for newline, expected in tests:
2659 buf = self.BytesIO()
2660 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2661 txt.write("AAA\nB")
2662 txt.write("BB\nCCC\n")
2663 txt.write("X\rY\r\nZ")
2664 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002665 self.assertEqual(buf.closed, False)
2666 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002667
2668 def test_destructor(self):
2669 l = []
2670 base = self.BytesIO
2671 class MyBytesIO(base):
2672 def close(self):
2673 l.append(self.getvalue())
2674 base.close(self)
2675 b = MyBytesIO()
2676 t = self.TextIOWrapper(b, encoding="ascii")
2677 t.write("abc")
2678 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002679 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002680 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002681
2682 def test_override_destructor(self):
2683 record = []
2684 class MyTextIO(self.TextIOWrapper):
2685 def __del__(self):
2686 record.append(1)
2687 try:
2688 f = super().__del__
2689 except AttributeError:
2690 pass
2691 else:
2692 f()
2693 def close(self):
2694 record.append(2)
2695 super().close()
2696 def flush(self):
2697 record.append(3)
2698 super().flush()
2699 b = self.BytesIO()
2700 t = MyTextIO(b, encoding="ascii")
2701 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002702 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002703 self.assertEqual(record, [1, 2, 3])
2704
2705 def test_error_through_destructor(self):
2706 # Test that the exception state is not modified by a destructor,
2707 # even if close() fails.
2708 rawio = self.CloseFailureIO()
2709 def f():
2710 self.TextIOWrapper(rawio).xyzzy
2711 with support.captured_output("stderr") as s:
2712 self.assertRaises(AttributeError, f)
2713 s = s.getvalue().strip()
2714 if s:
2715 # The destructor *may* have printed an unraisable error, check it
2716 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002717 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002718 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002719
Guido van Rossum9b76da62007-04-11 01:09:03 +00002720 # Systematic tests of the text I/O API
2721
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002722 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002723 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002724 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002725 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002726 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002727 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002728 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002729 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002730 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002731 self.assertEqual(f.tell(), 0)
2732 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002733 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002734 self.assertEqual(f.seek(0), 0)
2735 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002736 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002737 self.assertEqual(f.read(2), "ab")
2738 self.assertEqual(f.read(1), "c")
2739 self.assertEqual(f.read(1), "")
2740 self.assertEqual(f.read(), "")
2741 self.assertEqual(f.tell(), cookie)
2742 self.assertEqual(f.seek(0), 0)
2743 self.assertEqual(f.seek(0, 2), cookie)
2744 self.assertEqual(f.write("def"), 3)
2745 self.assertEqual(f.seek(cookie), cookie)
2746 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002747 if enc.startswith("utf"):
2748 self.multi_line_test(f, enc)
2749 f.close()
2750
2751 def multi_line_test(self, f, enc):
2752 f.seek(0)
2753 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002754 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002755 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002756 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002757 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002758 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002759 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002760 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002761 wlines.append((f.tell(), line))
2762 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002763 f.seek(0)
2764 rlines = []
2765 while True:
2766 pos = f.tell()
2767 line = f.readline()
2768 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002769 break
2770 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002771 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002772
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002773 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002774 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002775 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002776 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002777 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002778 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002779 p2 = f.tell()
2780 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002781 self.assertEqual(f.tell(), p0)
2782 self.assertEqual(f.readline(), "\xff\n")
2783 self.assertEqual(f.tell(), p1)
2784 self.assertEqual(f.readline(), "\xff\n")
2785 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002786 f.seek(0)
2787 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002788 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002789 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002790 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002791 f.close()
2792
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002793 def test_seeking(self):
2794 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002795 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002796 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002797 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002798 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002799 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002800 suffix = bytes(u_suffix.encode("utf-8"))
2801 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002802 with self.open(support.TESTFN, "wb") as f:
2803 f.write(line*2)
2804 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2805 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002806 self.assertEqual(s, str(prefix, "ascii"))
2807 self.assertEqual(f.tell(), prefix_size)
2808 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002809
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002810 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002811 # Regression test for a specific bug
2812 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002813 with self.open(support.TESTFN, "wb") as f:
2814 f.write(data)
2815 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2816 f._CHUNK_SIZE # Just test that it exists
2817 f._CHUNK_SIZE = 2
2818 f.readline()
2819 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002820
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002821 def test_seek_and_tell(self):
2822 #Test seek/tell using the StatefulIncrementalDecoder.
2823 # Make test faster by doing smaller seeks
2824 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002825
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002826 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002827 """Tell/seek to various points within a data stream and ensure
2828 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002829 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002830 f.write(data)
2831 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002832 f = self.open(support.TESTFN, encoding='test_decoder')
2833 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002834 decoded = f.read()
2835 f.close()
2836
Neal Norwitze2b07052008-03-18 19:52:05 +00002837 for i in range(min_pos, len(decoded) + 1): # seek positions
2838 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002839 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002840 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002841 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002842 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002843 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002844 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002845 f.close()
2846
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002847 # Enable the test decoder.
2848 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002849
2850 # Run the tests.
2851 try:
2852 # Try each test case.
2853 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002854 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002855
2856 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002857 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2858 offset = CHUNK_SIZE - len(input)//2
2859 prefix = b'.'*offset
2860 # Don't bother seeking into the prefix (takes too long).
2861 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002862 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002863
2864 # Ensure our test decoder won't interfere with subsequent tests.
2865 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002866 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002867
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002868 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002869 data = "1234567890"
2870 tests = ("utf-16",
2871 "utf-16-le",
2872 "utf-16-be",
2873 "utf-32",
2874 "utf-32-le",
2875 "utf-32-be")
2876 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002877 buf = self.BytesIO()
2878 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002879 # Check if the BOM is written only once (see issue1753).
2880 f.write(data)
2881 f.write(data)
2882 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002883 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002884 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002885 self.assertEqual(f.read(), data * 2)
2886 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002887
Benjamin Petersona1b49012009-03-31 23:11:32 +00002888 def test_unreadable(self):
2889 class UnReadable(self.BytesIO):
2890 def readable(self):
2891 return False
2892 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002893 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002894
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002895 def test_read_one_by_one(self):
2896 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002897 reads = ""
2898 while True:
2899 c = txt.read(1)
2900 if not c:
2901 break
2902 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002903 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002904
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002905 def test_readlines(self):
2906 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2907 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2908 txt.seek(0)
2909 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2910 txt.seek(0)
2911 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2912
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002913 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002914 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002915 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002916 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002917 reads = ""
2918 while True:
2919 c = txt.read(128)
2920 if not c:
2921 break
2922 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002923 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002924
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002925 def test_writelines(self):
2926 l = ['ab', 'cd', 'ef']
2927 buf = self.BytesIO()
2928 txt = self.TextIOWrapper(buf)
2929 txt.writelines(l)
2930 txt.flush()
2931 self.assertEqual(buf.getvalue(), b'abcdef')
2932
2933 def test_writelines_userlist(self):
2934 l = UserList(['ab', 'cd', 'ef'])
2935 buf = self.BytesIO()
2936 txt = self.TextIOWrapper(buf)
2937 txt.writelines(l)
2938 txt.flush()
2939 self.assertEqual(buf.getvalue(), b'abcdef')
2940
2941 def test_writelines_error(self):
2942 txt = self.TextIOWrapper(self.BytesIO())
2943 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2944 self.assertRaises(TypeError, txt.writelines, None)
2945 self.assertRaises(TypeError, txt.writelines, b'abc')
2946
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002947 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002948 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002949
2950 # read one char at a time
2951 reads = ""
2952 while True:
2953 c = txt.read(1)
2954 if not c:
2955 break
2956 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002957 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002958
2959 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002960 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002961 txt._CHUNK_SIZE = 4
2962
2963 reads = ""
2964 while True:
2965 c = txt.read(4)
2966 if not c:
2967 break
2968 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002969 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002970
2971 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002972 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002973 txt._CHUNK_SIZE = 4
2974
2975 reads = txt.read(4)
2976 reads += txt.read(4)
2977 reads += txt.readline()
2978 reads += txt.readline()
2979 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002980 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002981
2982 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002983 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002984 txt._CHUNK_SIZE = 4
2985
2986 reads = txt.read(4)
2987 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002988 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002989
2990 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002991 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002992 txt._CHUNK_SIZE = 4
2993
2994 reads = txt.read(4)
2995 pos = txt.tell()
2996 txt.seek(0)
2997 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002998 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002999
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003000 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003001 buffer = self.BytesIO(self.testdata)
3002 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003003
3004 self.assertEqual(buffer.seekable(), txt.seekable())
3005
Antoine Pitroue4501852009-05-14 18:55:55 +00003006 def test_append_bom(self):
3007 # The BOM is not written again when appending to a non-empty file
3008 filename = support.TESTFN
3009 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3010 with self.open(filename, 'w', encoding=charset) as f:
3011 f.write('aaa')
3012 pos = f.tell()
3013 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003014 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003015
3016 with self.open(filename, 'a', encoding=charset) as f:
3017 f.write('xxx')
3018 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003019 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003020
3021 def test_seek_bom(self):
3022 # Same test, but when seeking manually
3023 filename = support.TESTFN
3024 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3025 with self.open(filename, 'w', encoding=charset) as f:
3026 f.write('aaa')
3027 pos = f.tell()
3028 with self.open(filename, 'r+', encoding=charset) as f:
3029 f.seek(pos)
3030 f.write('zzz')
3031 f.seek(0)
3032 f.write('bbb')
3033 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003034 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003035
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003036 def test_seek_append_bom(self):
3037 # Same test, but first seek to the start and then to the end
3038 filename = support.TESTFN
3039 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3040 with self.open(filename, 'w', encoding=charset) as f:
3041 f.write('aaa')
3042 with self.open(filename, 'a', encoding=charset) as f:
3043 f.seek(0)
3044 f.seek(0, self.SEEK_END)
3045 f.write('xxx')
3046 with self.open(filename, 'rb') as f:
3047 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3048
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003049 def test_errors_property(self):
3050 with self.open(support.TESTFN, "w") as f:
3051 self.assertEqual(f.errors, "strict")
3052 with self.open(support.TESTFN, "w", errors="replace") as f:
3053 self.assertEqual(f.errors, "replace")
3054
Brett Cannon31f59292011-02-21 19:29:56 +00003055 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00003056 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003057 def test_threads_write(self):
3058 # Issue6750: concurrent writes could duplicate data
3059 event = threading.Event()
3060 with self.open(support.TESTFN, "w", buffering=1) as f:
3061 def run(n):
3062 text = "Thread%03d\n" % n
3063 event.wait()
3064 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003065 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003066 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003067 with support.start_threads(threads, event.set):
3068 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003069 with self.open(support.TESTFN) as f:
3070 content = f.read()
3071 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003072 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003073
Antoine Pitrou6be88762010-05-03 16:48:20 +00003074 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003075 # Test that text file is closed despite failed flush
3076 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003077 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003078 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003079 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003080 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003081 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003082 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003083 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003084 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003085 self.assertTrue(txt.buffer.closed)
3086 self.assertTrue(closed) # flush() called
3087 self.assertFalse(closed[0]) # flush() called before file closed
3088 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003089 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003090
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003091 def test_close_error_on_close(self):
3092 buffer = self.BytesIO(self.testdata)
3093 def bad_flush():
3094 raise OSError('flush')
3095 def bad_close():
3096 raise OSError('close')
3097 buffer.close = bad_close
3098 txt = self.TextIOWrapper(buffer, encoding="ascii")
3099 txt.flush = bad_flush
3100 with self.assertRaises(OSError) as err: # exception not swallowed
3101 txt.close()
3102 self.assertEqual(err.exception.args, ('close',))
3103 self.assertIsInstance(err.exception.__context__, OSError)
3104 self.assertEqual(err.exception.__context__.args, ('flush',))
3105 self.assertFalse(txt.closed)
3106
3107 def test_nonnormalized_close_error_on_close(self):
3108 # Issue #21677
3109 buffer = self.BytesIO(self.testdata)
3110 def bad_flush():
3111 raise non_existing_flush
3112 def bad_close():
3113 raise non_existing_close
3114 buffer.close = bad_close
3115 txt = self.TextIOWrapper(buffer, encoding="ascii")
3116 txt.flush = bad_flush
3117 with self.assertRaises(NameError) as err: # exception not swallowed
3118 txt.close()
3119 self.assertIn('non_existing_close', str(err.exception))
3120 self.assertIsInstance(err.exception.__context__, NameError)
3121 self.assertIn('non_existing_flush', str(err.exception.__context__))
3122 self.assertFalse(txt.closed)
3123
Antoine Pitrou6be88762010-05-03 16:48:20 +00003124 def test_multi_close(self):
3125 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3126 txt.close()
3127 txt.close()
3128 txt.close()
3129 self.assertRaises(ValueError, txt.flush)
3130
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003131 def test_unseekable(self):
3132 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3133 self.assertRaises(self.UnsupportedOperation, txt.tell)
3134 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3135
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003136 def test_readonly_attributes(self):
3137 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3138 buf = self.BytesIO(self.testdata)
3139 with self.assertRaises(AttributeError):
3140 txt.buffer = buf
3141
Antoine Pitroue96ec682011-07-23 21:46:35 +02003142 def test_rawio(self):
3143 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3144 # that subprocess.Popen() can have the required unbuffered
3145 # semantics with universal_newlines=True.
3146 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3147 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3148 # Reads
3149 self.assertEqual(txt.read(4), 'abcd')
3150 self.assertEqual(txt.readline(), 'efghi\n')
3151 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3152
3153 def test_rawio_write_through(self):
3154 # Issue #12591: with write_through=True, writes don't need a flush
3155 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3156 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3157 write_through=True)
3158 txt.write('1')
3159 txt.write('23\n4')
3160 txt.write('5')
3161 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3162
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003163 def test_bufio_write_through(self):
3164 # Issue #21396: write_through=True doesn't force a flush()
3165 # on the underlying binary buffered object.
3166 flush_called, write_called = [], []
3167 class BufferedWriter(self.BufferedWriter):
3168 def flush(self, *args, **kwargs):
3169 flush_called.append(True)
3170 return super().flush(*args, **kwargs)
3171 def write(self, *args, **kwargs):
3172 write_called.append(True)
3173 return super().write(*args, **kwargs)
3174
3175 rawio = self.BytesIO()
3176 data = b"a"
3177 bufio = BufferedWriter(rawio, len(data)*2)
3178 textio = self.TextIOWrapper(bufio, encoding='ascii',
3179 write_through=True)
3180 # write to the buffered io but don't overflow the buffer
3181 text = data.decode('ascii')
3182 textio.write(text)
3183
3184 # buffer.flush is not called with write_through=True
3185 self.assertFalse(flush_called)
3186 # buffer.write *is* called with write_through=True
3187 self.assertTrue(write_called)
3188 self.assertEqual(rawio.getvalue(), b"") # no flush
3189
3190 write_called = [] # reset
3191 textio.write(text * 10) # total content is larger than bufio buffer
3192 self.assertTrue(write_called)
3193 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3194
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02003195 def test_reconfigure_write_through(self):
3196 raw = self.MockRawIO([])
3197 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3198 t.write('1')
3199 t.reconfigure(write_through=True) # implied flush
3200 self.assertEqual(t.write_through, True)
3201 self.assertEqual(b''.join(raw._write_stack), b'1')
3202 t.write('23')
3203 self.assertEqual(b''.join(raw._write_stack), b'123')
3204 t.reconfigure(write_through=False)
3205 self.assertEqual(t.write_through, False)
3206 t.write('45')
3207 t.flush()
3208 self.assertEqual(b''.join(raw._write_stack), b'12345')
3209 # Keeping default value
3210 t.reconfigure()
3211 t.reconfigure(write_through=None)
3212 self.assertEqual(t.write_through, False)
3213 t.reconfigure(write_through=True)
3214 t.reconfigure()
3215 t.reconfigure(write_through=None)
3216 self.assertEqual(t.write_through, True)
3217
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003218 def test_read_nonbytes(self):
3219 # Issue #17106
3220 # Crash when underlying read() returns non-bytes
3221 t = self.TextIOWrapper(self.StringIO('a'))
3222 self.assertRaises(TypeError, t.read, 1)
3223 t = self.TextIOWrapper(self.StringIO('a'))
3224 self.assertRaises(TypeError, t.readline)
3225 t = self.TextIOWrapper(self.StringIO('a'))
3226 self.assertRaises(TypeError, t.read)
3227
3228 def test_illegal_decoder(self):
3229 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003230 # Bypass the early encoding check added in issue 20404
3231 def _make_illegal_wrapper():
3232 quopri = codecs.lookup("quopri")
3233 quopri._is_text_encoding = True
3234 try:
3235 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3236 newline='\n', encoding="quopri")
3237 finally:
3238 quopri._is_text_encoding = False
3239 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003240 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003241 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003242 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003243 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003244 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003245 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003246 self.assertRaises(TypeError, t.read)
3247
Antoine Pitrou712cb732013-12-21 15:51:54 +01003248 def _check_create_at_shutdown(self, **kwargs):
3249 # Issue #20037: creating a TextIOWrapper at shutdown
3250 # shouldn't crash the interpreter.
3251 iomod = self.io.__name__
3252 code = """if 1:
3253 import codecs
3254 import {iomod} as io
3255
3256 # Avoid looking up codecs at shutdown
3257 codecs.lookup('utf-8')
3258
3259 class C:
3260 def __init__(self):
3261 self.buf = io.BytesIO()
3262 def __del__(self):
3263 io.TextIOWrapper(self.buf, **{kwargs})
3264 print("ok")
3265 c = C()
3266 """.format(iomod=iomod, kwargs=kwargs)
3267 return assert_python_ok("-c", code)
3268
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003269 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003270 def test_create_at_shutdown_without_encoding(self):
3271 rc, out, err = self._check_create_at_shutdown()
3272 if err:
3273 # Can error out with a RuntimeError if the module state
3274 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003275 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003276 else:
3277 self.assertEqual("ok", out.decode().strip())
3278
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003279 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003280 def test_create_at_shutdown_with_encoding(self):
3281 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3282 errors='strict')
3283 self.assertFalse(err)
3284 self.assertEqual("ok", out.decode().strip())
3285
Antoine Pitroub8503892014-04-29 10:14:02 +02003286 def test_read_byteslike(self):
3287 r = MemviewBytesIO(b'Just some random string\n')
3288 t = self.TextIOWrapper(r, 'utf-8')
3289
3290 # TextIOwrapper will not read the full string, because
3291 # we truncate it to a multiple of the native int size
3292 # so that we can construct a more complex memoryview.
3293 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3294
3295 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3296
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003297 def test_issue22849(self):
3298 class F(object):
3299 def readable(self): return True
3300 def writable(self): return True
3301 def seekable(self): return True
3302
3303 for i in range(10):
3304 try:
3305 self.TextIOWrapper(F(), encoding='utf-8')
3306 except Exception:
3307 pass
3308
3309 F.tell = lambda x: 0
3310 t = self.TextIOWrapper(F(), encoding='utf-8')
3311
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003312
Antoine Pitroub8503892014-04-29 10:14:02 +02003313class MemviewBytesIO(io.BytesIO):
3314 '''A BytesIO object whose read method returns memoryviews
3315 rather than bytes'''
3316
3317 def read1(self, len_):
3318 return _to_memoryview(super().read1(len_))
3319
3320 def read(self, len_):
3321 return _to_memoryview(super().read(len_))
3322
3323def _to_memoryview(buf):
3324 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3325
3326 arr = array.array('i')
3327 idx = len(buf) - len(buf) % arr.itemsize
3328 arr.frombytes(buf[:idx])
3329 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003330
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003331
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003332class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003333 io = io
Steve Dowerfe8f4c92016-09-07 09:31:52 -07003334 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003335
3336 def test_initialization(self):
3337 r = self.BytesIO(b"\xc3\xa9\n\n")
3338 b = self.BufferedReader(r, 1000)
3339 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003340 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3341 self.assertRaises(ValueError, t.read)
3342
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003343 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3344 self.assertRaises(Exception, repr, t)
3345
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003346 def test_garbage_collection(self):
3347 # C TextIOWrapper objects are collected, and collecting them flushes
3348 # all data to disk.
3349 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003350 with support.check_warnings(('', ResourceWarning)):
3351 rawio = io.FileIO(support.TESTFN, "wb")
3352 b = self.BufferedWriter(rawio)
3353 t = self.TextIOWrapper(b, encoding="ascii")
3354 t.write("456def")
3355 t.x = t
3356 wr = weakref.ref(t)
3357 del t
3358 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003359 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003360 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003361 self.assertEqual(f.read(), b"456def")
3362
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003363 def test_rwpair_cleared_before_textio(self):
3364 # Issue 13070: TextIOWrapper's finalization would crash when called
3365 # after the reference to the underlying BufferedRWPair's writer got
3366 # cleared by the GC.
3367 for i in range(1000):
3368 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3369 t1 = self.TextIOWrapper(b1, encoding="ascii")
3370 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3371 t2 = self.TextIOWrapper(b2, encoding="ascii")
3372 # circular references
3373 t1.buddy = t2
3374 t2.buddy = t1
3375 support.gc_collect()
3376
3377
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003378class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003379 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003380 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003381
3382
3383class IncrementalNewlineDecoderTest(unittest.TestCase):
3384
3385 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003386 # UTF-8 specific tests for a newline decoder
3387 def _check_decode(b, s, **kwargs):
3388 # We exercise getstate() / setstate() as well as decode()
3389 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003390 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003391 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003392 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003393
Antoine Pitrou180a3362008-12-14 16:36:46 +00003394 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003395
Antoine Pitrou180a3362008-12-14 16:36:46 +00003396 _check_decode(b'\xe8', "")
3397 _check_decode(b'\xa2', "")
3398 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003399
Antoine Pitrou180a3362008-12-14 16:36:46 +00003400 _check_decode(b'\xe8', "")
3401 _check_decode(b'\xa2', "")
3402 _check_decode(b'\x88', "\u8888")
3403
3404 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003405 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3406
Antoine Pitrou180a3362008-12-14 16:36:46 +00003407 decoder.reset()
3408 _check_decode(b'\n', "\n")
3409 _check_decode(b'\r', "")
3410 _check_decode(b'', "\n", final=True)
3411 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003412
Antoine Pitrou180a3362008-12-14 16:36:46 +00003413 _check_decode(b'\r', "")
3414 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003415
Antoine Pitrou180a3362008-12-14 16:36:46 +00003416 _check_decode(b'\r\r\n', "\n\n")
3417 _check_decode(b'\r', "")
3418 _check_decode(b'\r', "\n")
3419 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003420
Antoine Pitrou180a3362008-12-14 16:36:46 +00003421 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3422 _check_decode(b'\xe8\xa2\x88', "\u8888")
3423 _check_decode(b'\n', "\n")
3424 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3425 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003426
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003427 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003428 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003429 if encoding is not None:
3430 encoder = codecs.getincrementalencoder(encoding)()
3431 def _decode_bytewise(s):
3432 # Decode one byte at a time
3433 for b in encoder.encode(s):
3434 result.append(decoder.decode(bytes([b])))
3435 else:
3436 encoder = None
3437 def _decode_bytewise(s):
3438 # Decode one char at a time
3439 for c in s:
3440 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003441 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003442 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003443 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003444 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003445 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003446 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003447 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003448 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003449 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003450 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003451 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003452 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003453 input = "abc"
3454 if encoder is not None:
3455 encoder.reset()
3456 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003457 self.assertEqual(decoder.decode(input), "abc")
3458 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003459
3460 def test_newline_decoder(self):
3461 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003462 # None meaning the IncrementalNewlineDecoder takes unicode input
3463 # rather than bytes input
3464 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003465 'utf-16', 'utf-16-le', 'utf-16-be',
3466 'utf-32', 'utf-32-le', 'utf-32-be',
3467 )
3468 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003469 decoder = enc and codecs.getincrementaldecoder(enc)()
3470 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3471 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003472 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003473 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3474 self.check_newline_decoding_utf8(decoder)
3475
Antoine Pitrou66913e22009-03-06 23:40:56 +00003476 def test_newline_bytes(self):
3477 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3478 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003479 self.assertEqual(dec.newlines, None)
3480 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3481 self.assertEqual(dec.newlines, None)
3482 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3483 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003484 dec = self.IncrementalNewlineDecoder(None, translate=False)
3485 _check(dec)
3486 dec = self.IncrementalNewlineDecoder(None, translate=True)
3487 _check(dec)
3488
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003489class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3490 pass
3491
3492class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3493 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003494
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003495
Guido van Rossum01a27522007-03-07 01:00:12 +00003496# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003497
Guido van Rossum5abbf752007-08-27 17:39:33 +00003498class MiscIOTest(unittest.TestCase):
3499
Barry Warsaw40e82462008-11-20 20:14:50 +00003500 def tearDown(self):
3501 support.unlink(support.TESTFN)
3502
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003503 def test___all__(self):
3504 for name in self.io.__all__:
3505 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003506 self.assertIsNotNone(obj, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003507 if name == "open":
3508 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003509 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003510 self.assertTrue(issubclass(obj, Exception), name)
3511 elif not name.startswith("SEEK_"):
3512 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003513
Barry Warsaw40e82462008-11-20 20:14:50 +00003514 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003515 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003516 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003517 f.close()
3518
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003519 with support.check_warnings(('', DeprecationWarning)):
3520 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003521 self.assertEqual(f.name, support.TESTFN)
3522 self.assertEqual(f.buffer.name, support.TESTFN)
3523 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3524 self.assertEqual(f.mode, "U")
3525 self.assertEqual(f.buffer.mode, "rb")
3526 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003527 f.close()
3528
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003529 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003530 self.assertEqual(f.mode, "w+")
3531 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3532 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003533
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003534 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003535 self.assertEqual(g.mode, "wb")
3536 self.assertEqual(g.raw.mode, "wb")
3537 self.assertEqual(g.name, f.fileno())
3538 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003539 f.close()
3540 g.close()
3541
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003542 def test_io_after_close(self):
3543 for kwargs in [
3544 {"mode": "w"},
3545 {"mode": "wb"},
3546 {"mode": "w", "buffering": 1},
3547 {"mode": "w", "buffering": 2},
3548 {"mode": "wb", "buffering": 0},
3549 {"mode": "r"},
3550 {"mode": "rb"},
3551 {"mode": "r", "buffering": 1},
3552 {"mode": "r", "buffering": 2},
3553 {"mode": "rb", "buffering": 0},
3554 {"mode": "w+"},
3555 {"mode": "w+b"},
3556 {"mode": "w+", "buffering": 1},
3557 {"mode": "w+", "buffering": 2},
3558 {"mode": "w+b", "buffering": 0},
3559 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003560 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003561 f.close()
3562 self.assertRaises(ValueError, f.flush)
3563 self.assertRaises(ValueError, f.fileno)
3564 self.assertRaises(ValueError, f.isatty)
3565 self.assertRaises(ValueError, f.__iter__)
3566 if hasattr(f, "peek"):
3567 self.assertRaises(ValueError, f.peek, 1)
3568 self.assertRaises(ValueError, f.read)
3569 if hasattr(f, "read1"):
3570 self.assertRaises(ValueError, f.read1, 1024)
Martin Panterccb2c0e2016-10-20 23:48:14 +00003571 self.assertRaises(ValueError, f.read1)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003572 if hasattr(f, "readall"):
3573 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003574 if hasattr(f, "readinto"):
3575 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003576 if hasattr(f, "readinto1"):
3577 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003578 self.assertRaises(ValueError, f.readline)
3579 self.assertRaises(ValueError, f.readlines)
Xiang Zhang026435c2017-04-15 12:47:28 +08003580 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003581 self.assertRaises(ValueError, f.seek, 0)
3582 self.assertRaises(ValueError, f.tell)
3583 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003584 self.assertRaises(ValueError, f.write,
3585 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003586 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003587 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003588
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003589 def test_blockingioerror(self):
3590 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003591 class C(str):
3592 pass
3593 c = C("")
3594 b = self.BlockingIOError(1, c)
3595 c.b = b
3596 b.c = c
3597 wr = weakref.ref(c)
3598 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003599 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003600 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003601
3602 def test_abcs(self):
3603 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003604 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3605 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3606 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3607 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003608
3609 def _check_abc_inheritance(self, abcmodule):
3610 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003611 self.assertIsInstance(f, abcmodule.IOBase)
3612 self.assertIsInstance(f, abcmodule.RawIOBase)
3613 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3614 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003615 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003616 self.assertIsInstance(f, abcmodule.IOBase)
3617 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3618 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3619 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003620 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003621 self.assertIsInstance(f, abcmodule.IOBase)
3622 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3623 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3624 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003625
3626 def test_abc_inheritance(self):
3627 # Test implementations inherit from their respective ABCs
3628 self._check_abc_inheritance(self)
3629
3630 def test_abc_inheritance_official(self):
3631 # Test implementations inherit from the official ABCs of the
3632 # baseline "io" module.
3633 self._check_abc_inheritance(io)
3634
Antoine Pitroue033e062010-10-29 10:38:18 +00003635 def _check_warn_on_dealloc(self, *args, **kwargs):
3636 f = open(*args, **kwargs)
3637 r = repr(f)
3638 with self.assertWarns(ResourceWarning) as cm:
3639 f = None
3640 support.gc_collect()
3641 self.assertIn(r, str(cm.warning.args[0]))
3642
3643 def test_warn_on_dealloc(self):
3644 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3645 self._check_warn_on_dealloc(support.TESTFN, "wb")
3646 self._check_warn_on_dealloc(support.TESTFN, "w")
3647
3648 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3649 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003650 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003651 for fd in fds:
3652 try:
3653 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003654 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003655 if e.errno != errno.EBADF:
3656 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003657 self.addCleanup(cleanup_fds)
3658 r, w = os.pipe()
3659 fds += r, w
3660 self._check_warn_on_dealloc(r, *args, **kwargs)
3661 # When using closefd=False, there's no warning
3662 r, w = os.pipe()
3663 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02003664 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00003665 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00003666
3667 def test_warn_on_dealloc_fd(self):
3668 self._check_warn_on_dealloc_fd("rb", buffering=0)
3669 self._check_warn_on_dealloc_fd("rb")
3670 self._check_warn_on_dealloc_fd("r")
3671
3672
Antoine Pitrou243757e2010-11-05 21:15:39 +00003673 def test_pickling(self):
3674 # Pickling file objects is forbidden
3675 for kwargs in [
3676 {"mode": "w"},
3677 {"mode": "wb"},
3678 {"mode": "wb", "buffering": 0},
3679 {"mode": "r"},
3680 {"mode": "rb"},
3681 {"mode": "rb", "buffering": 0},
3682 {"mode": "w+"},
3683 {"mode": "w+b"},
3684 {"mode": "w+b", "buffering": 0},
3685 ]:
3686 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3687 with self.open(support.TESTFN, **kwargs) as f:
3688 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3689
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003690 def test_nonblock_pipe_write_bigbuf(self):
3691 self._test_nonblock_pipe_write(16*1024)
3692
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003693 def test_nonblock_pipe_write_smallbuf(self):
3694 self._test_nonblock_pipe_write(1024)
3695
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003696 @unittest.skipUnless(hasattr(os, 'set_blocking'),
3697 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003698 def _test_nonblock_pipe_write(self, bufsize):
3699 sent = []
3700 received = []
3701 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003702 os.set_blocking(r, False)
3703 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003704
3705 # To exercise all code paths in the C implementation we need
3706 # to play with buffer sizes. For instance, if we choose a
3707 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3708 # then we will never get a partial write of the buffer.
3709 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3710 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3711
3712 with rf, wf:
3713 for N in 9999, 73, 7574:
3714 try:
3715 i = 0
3716 while True:
3717 msg = bytes([i % 26 + 97]) * N
3718 sent.append(msg)
3719 wf.write(msg)
3720 i += 1
3721
3722 except self.BlockingIOError as e:
3723 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003724 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003725 sent[-1] = sent[-1][:e.characters_written]
3726 received.append(rf.read())
3727 msg = b'BLOCKED'
3728 wf.write(msg)
3729 sent.append(msg)
3730
3731 while True:
3732 try:
3733 wf.flush()
3734 break
3735 except self.BlockingIOError as e:
3736 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003737 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003738 self.assertEqual(e.characters_written, 0)
3739 received.append(rf.read())
3740
3741 received += iter(rf.read, None)
3742
3743 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003744 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003745 self.assertTrue(wf.closed)
3746 self.assertTrue(rf.closed)
3747
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003748 def test_create_fail(self):
3749 # 'x' mode fails if file is existing
3750 with self.open(support.TESTFN, 'w'):
3751 pass
3752 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3753
3754 def test_create_writes(self):
3755 # 'x' mode opens for writing
3756 with self.open(support.TESTFN, 'xb') as f:
3757 f.write(b"spam")
3758 with self.open(support.TESTFN, 'rb') as f:
3759 self.assertEqual(b"spam", f.read())
3760
Christian Heimes7b648752012-09-10 14:48:43 +02003761 def test_open_allargs(self):
3762 # there used to be a buffer overflow in the parser for rawmode
3763 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3764
3765
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003766class CMiscIOTest(MiscIOTest):
3767 io = io
3768
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003769 def test_readinto_buffer_overflow(self):
3770 # Issue #18025
3771 class BadReader(self.io.BufferedIOBase):
3772 def read(self, n=-1):
3773 return b'x' * 10**6
3774 bufio = BadReader()
3775 b = bytearray(2)
3776 self.assertRaises(ValueError, bufio.readinto, b)
3777
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003778 @unittest.skipUnless(threading, 'Threading required for this test.')
3779 def check_daemon_threads_shutdown_deadlock(self, stream_name):
3780 # Issue #23309: deadlocks at shutdown should be avoided when a
3781 # daemon thread and the main thread both write to a file.
3782 code = """if 1:
3783 import sys
3784 import time
3785 import threading
Victor Stinner2a1aed02017-04-21 17:59:23 +02003786 from test.support import SuppressCrashReport
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003787
3788 file = sys.{stream_name}
3789
3790 def run():
3791 while True:
3792 file.write('.')
3793 file.flush()
3794
Victor Stinner2a1aed02017-04-21 17:59:23 +02003795 crash = SuppressCrashReport()
3796 crash.__enter__()
3797 # don't call __exit__(): the crash occurs at Python shutdown
3798
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003799 thread = threading.Thread(target=run)
3800 thread.daemon = True
3801 thread.start()
3802
3803 time.sleep(0.5)
3804 file.write('!')
3805 file.flush()
3806 """.format_map(locals())
3807 res, _ = run_python_until_end("-c", code)
3808 err = res.err.decode()
3809 if res.rc != 0:
3810 # Failure: should be a fatal error
3811 self.assertIn("Fatal Python error: could not acquire lock "
3812 "for <_io.BufferedWriter name='<{stream_name}>'> "
3813 "at interpreter shutdown, possibly due to "
3814 "daemon threads".format_map(locals()),
3815 err)
3816 else:
3817 self.assertFalse(err.strip('.!'))
3818
3819 def test_daemon_threads_shutdown_stdout_deadlock(self):
3820 self.check_daemon_threads_shutdown_deadlock('stdout')
3821
3822 def test_daemon_threads_shutdown_stderr_deadlock(self):
3823 self.check_daemon_threads_shutdown_deadlock('stderr')
3824
3825
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003826class PyMiscIOTest(MiscIOTest):
3827 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003828
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003829
3830@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3831class SignalsTest(unittest.TestCase):
3832
3833 def setUp(self):
3834 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3835
3836 def tearDown(self):
3837 signal.signal(signal.SIGALRM, self.oldalrm)
3838
3839 def alarm_interrupt(self, sig, frame):
3840 1/0
3841
3842 @unittest.skipUnless(threading, 'Threading required for this test.')
3843 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3844 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003845 invokes the signal handler, and bubbles up the exception raised
3846 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003847 read_results = []
3848 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003849 if hasattr(signal, 'pthread_sigmask'):
3850 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003851 s = os.read(r, 1)
3852 read_results.append(s)
3853 t = threading.Thread(target=_read)
3854 t.daemon = True
3855 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003856 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01003857 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003858 try:
3859 wio = self.io.open(w, **fdopen_kwargs)
3860 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003861 # Fill the pipe enough that the write will be blocking.
3862 # It will be interrupted by the timer armed above. Since the
3863 # other thread has read one byte, the low-level write will
3864 # return with a successful (partial) result rather than an EINTR.
3865 # The buffered IO layer must check for pending signal
3866 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003867 signal.alarm(1)
3868 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01003869 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02003870 finally:
3871 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003872 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003873 # We got one byte, get another one and check that it isn't a
3874 # repeat of the first one.
3875 read_results.append(os.read(r, 1))
3876 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3877 finally:
3878 os.close(w)
3879 os.close(r)
3880 # This is deliberate. If we didn't close the file descriptor
3881 # before closing wio, wio would try to flush its internal
3882 # buffer, and block again.
3883 try:
3884 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003885 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003886 if e.errno != errno.EBADF:
3887 raise
3888
3889 def test_interrupted_write_unbuffered(self):
3890 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3891
3892 def test_interrupted_write_buffered(self):
3893 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3894
Victor Stinner6ab72862014-09-03 23:32:28 +02003895 # Issue #22331: The test hangs on FreeBSD 7.2
3896 @support.requires_freebsd_version(8)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003897 def test_interrupted_write_text(self):
3898 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3899
Brett Cannon31f59292011-02-21 19:29:56 +00003900 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003901 def check_reentrant_write(self, data, **fdopen_kwargs):
3902 def on_alarm(*args):
3903 # Will be called reentrantly from the same thread
3904 wio.write(data)
3905 1/0
3906 signal.signal(signal.SIGALRM, on_alarm)
3907 r, w = os.pipe()
3908 wio = self.io.open(w, **fdopen_kwargs)
3909 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003910 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003911 # Either the reentrant call to wio.write() fails with RuntimeError,
3912 # or the signal handler raises ZeroDivisionError.
3913 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3914 while 1:
3915 for i in range(100):
3916 wio.write(data)
3917 wio.flush()
3918 # Make sure the buffer doesn't fill up and block further writes
3919 os.read(r, len(data) * 100)
3920 exc = cm.exception
3921 if isinstance(exc, RuntimeError):
3922 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3923 finally:
3924 wio.close()
3925 os.close(r)
3926
3927 def test_reentrant_write_buffered(self):
3928 self.check_reentrant_write(b"xy", mode="wb")
3929
3930 def test_reentrant_write_text(self):
3931 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3932
Antoine Pitrou707ce822011-02-25 21:24:11 +00003933 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3934 """Check that a buffered read, when it gets interrupted (either
3935 returning a partial result or EINTR), properly invokes the signal
3936 handler and retries if the latter returned successfully."""
3937 r, w = os.pipe()
3938 fdopen_kwargs["closefd"] = False
3939 def alarm_handler(sig, frame):
3940 os.write(w, b"bar")
3941 signal.signal(signal.SIGALRM, alarm_handler)
3942 try:
3943 rio = self.io.open(r, **fdopen_kwargs)
3944 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003945 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003946 # Expected behaviour:
3947 # - first raw read() returns partial b"foo"
3948 # - second raw read() returns EINTR
3949 # - third raw read() returns b"bar"
3950 self.assertEqual(decode(rio.read(6)), "foobar")
3951 finally:
3952 rio.close()
3953 os.close(w)
3954 os.close(r)
3955
Antoine Pitrou20db5112011-08-19 20:32:34 +02003956 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003957 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3958 mode="rb")
3959
Antoine Pitrou20db5112011-08-19 20:32:34 +02003960 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003961 self.check_interrupted_read_retry(lambda x: x,
3962 mode="r")
3963
3964 @unittest.skipUnless(threading, 'Threading required for this test.')
3965 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3966 """Check that a buffered write, when it gets interrupted (either
3967 returning a partial result or EINTR), properly invokes the signal
3968 handler and retries if the latter returned successfully."""
3969 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003970
Antoine Pitrou707ce822011-02-25 21:24:11 +00003971 # A quantity that exceeds the buffer size of an anonymous pipe's
3972 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003973 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003974 r, w = os.pipe()
3975 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003976
Antoine Pitrou707ce822011-02-25 21:24:11 +00003977 # We need a separate thread to read from the pipe and allow the
3978 # write() to finish. This thread is started after the SIGALRM is
3979 # received (forcing a first EINTR in write()).
3980 read_results = []
3981 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003982 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00003983 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003984 try:
3985 while not write_finished:
3986 while r in select.select([r], [], [], 1.0)[0]:
3987 s = os.read(r, 1024)
3988 read_results.append(s)
3989 except BaseException as exc:
3990 nonlocal error
3991 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00003992 t = threading.Thread(target=_read)
3993 t.daemon = True
3994 def alarm1(sig, frame):
3995 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003996 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003997 def alarm2(sig, frame):
3998 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003999
4000 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00004001 signal.signal(signal.SIGALRM, alarm1)
4002 try:
4003 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004004 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004005 # Expected behaviour:
4006 # - first raw write() is partial (because of the limited pipe buffer
4007 # and the first alarm)
4008 # - second raw write() returns EINTR (because of the second alarm)
4009 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004010 written = wio.write(large_data)
4011 self.assertEqual(N, written)
4012
Antoine Pitrou707ce822011-02-25 21:24:11 +00004013 wio.flush()
4014 write_finished = True
4015 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004016
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004017 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004018 self.assertEqual(N, sum(len(x) for x in read_results))
4019 finally:
4020 write_finished = True
4021 os.close(w)
4022 os.close(r)
4023 # This is deliberate. If we didn't close the file descriptor
4024 # before closing wio, wio would try to flush its internal
4025 # buffer, and could block (in case of failure).
4026 try:
4027 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004028 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00004029 if e.errno != errno.EBADF:
4030 raise
4031
Antoine Pitrou20db5112011-08-19 20:32:34 +02004032 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004033 self.check_interrupted_write_retry(b"x", mode="wb")
4034
Antoine Pitrou20db5112011-08-19 20:32:34 +02004035 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004036 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4037
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004038
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004039class CSignalsTest(SignalsTest):
4040 io = io
4041
4042class PySignalsTest(SignalsTest):
4043 io = pyio
4044
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004045 # Handling reentrancy issues would slow down _pyio even more, so the
4046 # tests are disabled.
4047 test_reentrant_write_buffered = None
4048 test_reentrant_write_text = None
4049
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004050
Ezio Melottidaa42c72013-03-23 16:30:16 +02004051def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07004052 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004053 CBufferedReaderTest, PyBufferedReaderTest,
4054 CBufferedWriterTest, PyBufferedWriterTest,
4055 CBufferedRWPairTest, PyBufferedRWPairTest,
4056 CBufferedRandomTest, PyBufferedRandomTest,
4057 StatefulIncrementalDecoderTest,
4058 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4059 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004060 CMiscIOTest, PyMiscIOTest,
4061 CSignalsTest, PySignalsTest,
4062 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004063
4064 # Put the namespaces of the IO module we are testing and some useful mock
4065 # classes in the __dict__ of each test.
4066 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00004067 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004068 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4069 c_io_ns = {name : getattr(io, name) for name in all_members}
4070 py_io_ns = {name : getattr(pyio, name) for name in all_members}
4071 globs = globals()
4072 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4073 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4074 # Avoid turning open into a bound method.
4075 py_io_ns["open"] = pyio.OpenWrapper
4076 for test in tests:
4077 if test.__name__.startswith("C"):
4078 for name, obj in c_io_ns.items():
4079 setattr(test, name, obj)
4080 elif test.__name__.startswith("Py"):
4081 for name, obj in py_io_ns.items():
4082 setattr(test, name, obj)
4083
Ezio Melottidaa42c72013-03-23 16:30:16 +02004084 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4085 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004086
4087if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004088 unittest.main()