blob: 48270c88be6faf59a25353a0b0eaa65ce7b949e1 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Berker Peksagce643912015-05-06 06:33:17 +030038from test.support.script_helper import assert_python_ok, run_python_until_end
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000047
Martin Panter6bb91f32016-05-28 00:41:57 +000048try:
49 import ctypes
50except ImportError:
51 def byteslike(*pos, **kw):
52 return array.array("b", bytes(*pos, **kw))
53else:
54 def byteslike(*pos, **kw):
55 """Create a bytes-like object having no string or sequence methods"""
56 data = bytes(*pos, **kw)
57 obj = EmptyStruct()
58 ctypes.resize(obj, len(data))
59 memoryview(obj).cast("B")[:] = data
60 return obj
61 class EmptyStruct(ctypes.Structure):
62 pass
63
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064def _default_chunk_size():
65 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000066 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000067 return f._CHUNK_SIZE
68
69
Antoine Pitrou328ec742010-09-14 18:37:24 +000070class MockRawIOWithoutRead:
71 """A RawIO implementation without read(), so as to exercise the default
72 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000073
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000074 def __init__(self, read_stack=()):
75 self._read_stack = list(read_stack)
76 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000077 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000078 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000079
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000082 return len(b)
83
84 def writable(self):
85 return True
86
Guido van Rossum68bbcd22007-02-27 17:19:33 +000087 def fileno(self):
88 return 42
89
90 def readable(self):
91 return True
92
Guido van Rossum01a27522007-03-07 01:00:12 +000093 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000094 return True
95
Guido van Rossum01a27522007-03-07 01:00:12 +000096 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000098
99 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000100 return 0 # same comment as above
101
102 def readinto(self, buf):
103 self._reads += 1
104 max_len = len(buf)
105 try:
106 data = self._read_stack[0]
107 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000108 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000109 return 0
110 if data is None:
111 del self._read_stack[0]
112 return None
113 n = len(data)
114 if len(data) <= max_len:
115 del self._read_stack[0]
116 buf[:n] = data
117 return n
118 else:
119 buf[:] = data[:max_len]
120 self._read_stack[0] = data[max_len:]
121 return max_len
122
123 def truncate(self, pos=None):
124 return pos
125
Antoine Pitrou328ec742010-09-14 18:37:24 +0000126class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
127 pass
128
129class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
130 pass
131
132
133class MockRawIO(MockRawIOWithoutRead):
134
135 def read(self, n=None):
136 self._reads += 1
137 try:
138 return self._read_stack.pop(0)
139 except:
140 self._extraneous_reads += 1
141 return b""
142
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000143class CMockRawIO(MockRawIO, io.RawIOBase):
144 pass
145
146class PyMockRawIO(MockRawIO, pyio.RawIOBase):
147 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000148
Guido van Rossuma9e20242007-03-08 00:43:48 +0000149
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000150class MisbehavedRawIO(MockRawIO):
151 def write(self, b):
152 return super().write(b) * 2
153
154 def read(self, n=None):
155 return super().read(n) * 2
156
157 def seek(self, pos, whence):
158 return -123
159
160 def tell(self):
161 return -456
162
163 def readinto(self, buf):
164 super().readinto(buf)
165 return len(buf) * 5
166
167class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
168 pass
169
170class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
171 pass
172
173
174class CloseFailureIO(MockRawIO):
175 closed = 0
176
177 def close(self):
178 if not self.closed:
179 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200180 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181
182class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
183 pass
184
185class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
186 pass
187
188
189class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000190
191 def __init__(self, data):
192 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000193 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000194
195 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000197 self.read_history.append(None if res is None else len(res))
198 return res
199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000200 def readinto(self, b):
201 res = super().readinto(b)
202 self.read_history.append(res)
203 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000204
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000205class CMockFileIO(MockFileIO, io.BytesIO):
206 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000207
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000208class PyMockFileIO(MockFileIO, pyio.BytesIO):
209 pass
210
211
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000212class MockUnseekableIO:
213 def seekable(self):
214 return False
215
216 def seek(self, *args):
217 raise self.UnsupportedOperation("not seekable")
218
219 def tell(self, *args):
220 raise self.UnsupportedOperation("not seekable")
221
Martin Panter754aab22016-03-31 07:21:56 +0000222 def truncate(self, *args):
223 raise self.UnsupportedOperation("not seekable")
224
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000225class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
226 UnsupportedOperation = io.UnsupportedOperation
227
228class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
229 UnsupportedOperation = pyio.UnsupportedOperation
230
231
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000232class MockNonBlockWriterIO:
233
234 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000235 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000236 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000238 def pop_written(self):
239 s = b"".join(self._write_stack)
240 self._write_stack[:] = []
241 return s
242
243 def block_on(self, char):
244 """Block when a given char is encountered."""
245 self._blocker_char = char
246
247 def readable(self):
248 return True
249
250 def seekable(self):
251 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000252
Guido van Rossum01a27522007-03-07 01:00:12 +0000253 def writable(self):
254 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000255
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000256 def write(self, b):
257 b = bytes(b)
258 n = -1
259 if self._blocker_char:
260 try:
261 n = b.index(self._blocker_char)
262 except ValueError:
263 pass
264 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100265 if n > 0:
266 # write data up to the first blocker
267 self._write_stack.append(b[:n])
268 return n
269 else:
270 # cancel blocker and indicate would block
271 self._blocker_char = None
272 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000273 self._write_stack.append(b)
274 return len(b)
275
276class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
277 BlockingIOError = io.BlockingIOError
278
279class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
280 BlockingIOError = pyio.BlockingIOError
281
Guido van Rossuma9e20242007-03-08 00:43:48 +0000282
Guido van Rossum28524c72007-02-27 05:47:44 +0000283class IOTest(unittest.TestCase):
284
Neal Norwitze7789b12008-03-24 06:18:09 +0000285 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000286 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000287
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000288 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000289 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000290
Guido van Rossum28524c72007-02-27 05:47:44 +0000291 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000292 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293 f.truncate(0)
294 self.assertEqual(f.tell(), 5)
295 f.seek(0)
296
297 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000298 self.assertEqual(f.seek(0), 0)
299 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000300 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000301 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000302 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000303 buffer = bytearray(b" world\n\n\n")
304 self.assertEqual(f.write(buffer), 9)
305 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000306 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000307 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000308 self.assertEqual(f.seek(-1, 2), 13)
309 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000310
Guido van Rossum87429772007-04-10 21:06:59 +0000311 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000312 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000313 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000314
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 def read_ops(self, f, buffered=False):
316 data = f.read(5)
317 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000318 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000319 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000320 self.assertEqual(bytes(data), b" worl")
321 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000322 self.assertEqual(f.readinto(data), 2)
323 self.assertEqual(len(data), 5)
324 self.assertEqual(data[:2], b"d\n")
325 self.assertEqual(f.seek(0), 0)
326 self.assertEqual(f.read(20), b"hello world\n")
327 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000328 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000329 self.assertEqual(f.seek(-6, 2), 6)
330 self.assertEqual(f.read(5), b"world")
331 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000332 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000333 self.assertEqual(f.seek(-6, 1), 5)
334 self.assertEqual(f.read(5), b" worl")
335 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000336 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000337 if buffered:
338 f.seek(0)
339 self.assertEqual(f.read(), b"hello world\n")
340 f.seek(6)
341 self.assertEqual(f.read(), b"world\n")
342 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000343 f.seek(0)
344 data = byteslike(5)
345 self.assertEqual(f.readinto1(data), 5)
346 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000347
Guido van Rossum34d69e52007-04-10 20:08:41 +0000348 LARGE = 2**31
349
Guido van Rossum53807da2007-04-10 19:01:47 +0000350 def large_file_ops(self, f):
351 assert f.readable()
352 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100353 try:
354 self.assertEqual(f.seek(self.LARGE), self.LARGE)
355 except (OverflowError, ValueError):
356 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000357 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000358 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000359 self.assertEqual(f.tell(), self.LARGE + 3)
360 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000361 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000362 self.assertEqual(f.tell(), self.LARGE + 2)
363 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000364 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000365 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000366 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
367 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000368 self.assertEqual(f.read(2), b"x")
369
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000370 def test_invalid_operations(self):
371 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000372 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000373 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000374 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000375 self.assertRaises(exc, fp.read)
376 self.assertRaises(exc, fp.readline)
377 with self.open(support.TESTFN, "wb", buffering=0) as fp:
378 self.assertRaises(exc, fp.read)
379 self.assertRaises(exc, fp.readline)
380 with self.open(support.TESTFN, "rb", buffering=0) as fp:
381 self.assertRaises(exc, fp.write, b"blah")
382 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000383 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000384 self.assertRaises(exc, fp.write, b"blah")
385 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000386 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000387 self.assertRaises(exc, fp.write, "blah")
388 self.assertRaises(exc, fp.writelines, ["blah\n"])
389 # Non-zero seeking from current or end pos
390 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
391 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000392
Martin Panter754aab22016-03-31 07:21:56 +0000393 def test_optional_abilities(self):
394 # Test for OSError when optional APIs are not supported
395 # The purpose of this test is to try fileno(), reading, writing and
396 # seeking operations with various objects that indicate they do not
397 # support these operations.
398
399 def pipe_reader():
400 [r, w] = os.pipe()
401 os.close(w) # So that read() is harmless
402 return self.FileIO(r, "r")
403
404 def pipe_writer():
405 [r, w] = os.pipe()
406 self.addCleanup(os.close, r)
407 # Guarantee that we can write into the pipe without blocking
408 thread = threading.Thread(target=os.read, args=(r, 100))
409 thread.start()
410 self.addCleanup(thread.join)
411 return self.FileIO(w, "w")
412
413 def buffered_reader():
414 return self.BufferedReader(self.MockUnseekableIO())
415
416 def buffered_writer():
417 return self.BufferedWriter(self.MockUnseekableIO())
418
419 def buffered_random():
420 return self.BufferedRandom(self.BytesIO())
421
422 def buffered_rw_pair():
423 return self.BufferedRWPair(self.MockUnseekableIO(),
424 self.MockUnseekableIO())
425
426 def text_reader():
427 class UnseekableReader(self.MockUnseekableIO):
428 writable = self.BufferedIOBase.writable
429 write = self.BufferedIOBase.write
430 return self.TextIOWrapper(UnseekableReader(), "ascii")
431
432 def text_writer():
433 class UnseekableWriter(self.MockUnseekableIO):
434 readable = self.BufferedIOBase.readable
435 read = self.BufferedIOBase.read
436 return self.TextIOWrapper(UnseekableWriter(), "ascii")
437
438 tests = (
439 (pipe_reader, "fr"), (pipe_writer, "fw"),
440 (buffered_reader, "r"), (buffered_writer, "w"),
441 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
442 (text_reader, "r"), (text_writer, "w"),
443 (self.BytesIO, "rws"), (self.StringIO, "rws"),
444 )
445 for [test, abilities] in tests:
446 if test is pipe_writer and not threading:
447 continue # Skip subtest that uses a background thread
448 with self.subTest(test), test() as obj:
449 readable = "r" in abilities
450 self.assertEqual(obj.readable(), readable)
451 writable = "w" in abilities
452 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000453
454 if isinstance(obj, self.TextIOBase):
455 data = "3"
456 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
457 data = b"3"
458 else:
459 self.fail("Unknown base class")
460
461 if "f" in abilities:
462 obj.fileno()
463 else:
464 self.assertRaises(OSError, obj.fileno)
465
466 if readable:
467 obj.read(1)
468 obj.read()
469 else:
470 self.assertRaises(OSError, obj.read, 1)
471 self.assertRaises(OSError, obj.read)
472
473 if writable:
474 obj.write(data)
475 else:
476 self.assertRaises(OSError, obj.write, data)
477
Martin Panter3ee147f2016-03-31 21:05:31 +0000478 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000479 pipe_reader, pipe_writer):
480 # Pipes seem to appear as seekable on Windows
481 continue
482 seekable = "s" in abilities
483 self.assertEqual(obj.seekable(), seekable)
484
Martin Panter754aab22016-03-31 07:21:56 +0000485 if seekable:
486 obj.tell()
487 obj.seek(0)
488 else:
489 self.assertRaises(OSError, obj.tell)
490 self.assertRaises(OSError, obj.seek, 0)
491
492 if writable and seekable:
493 obj.truncate()
494 obj.truncate(0)
495 else:
496 self.assertRaises(OSError, obj.truncate)
497 self.assertRaises(OSError, obj.truncate, 0)
498
Antoine Pitrou13348842012-01-29 18:36:34 +0100499 def test_open_handles_NUL_chars(self):
500 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300501 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100502
503 bytes_fn = bytes(fn_with_NUL, 'ascii')
504 with warnings.catch_warnings():
505 warnings.simplefilter("ignore", DeprecationWarning)
506 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100507
Guido van Rossum28524c72007-02-27 05:47:44 +0000508 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000509 with self.open(support.TESTFN, "wb", buffering=0) as f:
510 self.assertEqual(f.readable(), False)
511 self.assertEqual(f.writable(), True)
512 self.assertEqual(f.seekable(), True)
513 self.write_ops(f)
514 with self.open(support.TESTFN, "rb", buffering=0) as f:
515 self.assertEqual(f.readable(), True)
516 self.assertEqual(f.writable(), False)
517 self.assertEqual(f.seekable(), True)
518 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000519
Guido van Rossum87429772007-04-10 21:06:59 +0000520 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000521 with self.open(support.TESTFN, "wb") as f:
522 self.assertEqual(f.readable(), False)
523 self.assertEqual(f.writable(), True)
524 self.assertEqual(f.seekable(), True)
525 self.write_ops(f)
526 with self.open(support.TESTFN, "rb") as f:
527 self.assertEqual(f.readable(), True)
528 self.assertEqual(f.writable(), False)
529 self.assertEqual(f.seekable(), True)
530 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000531
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000532 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000533 with self.open(support.TESTFN, "wb") as f:
534 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
535 with self.open(support.TESTFN, "rb") as f:
536 self.assertEqual(f.readline(), b"abc\n")
537 self.assertEqual(f.readline(10), b"def\n")
538 self.assertEqual(f.readline(2), b"xy")
539 self.assertEqual(f.readline(4), b"zzy\n")
540 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000541 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000542 self.assertRaises(TypeError, f.readline, 5.3)
543 with self.open(support.TESTFN, "r") as f:
544 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000545
Serhiy Storchakabf623ae2017-04-19 20:03:52 +0300546 def test_readline_nonsizeable(self):
547 # Issue #30061
548 # Crash when readline() returns an object without __len__
549 class R(self.IOBase):
550 def readline(self):
551 return None
552 self.assertRaises((TypeError, StopIteration), next, R())
553
554 def test_next_nonsizeable(self):
555 # Issue #30061
556 # Crash when __next__() returns an object without __len__
557 class R(self.IOBase):
558 def __next__(self):
559 return None
560 self.assertRaises(TypeError, R().readlines, 1)
561
Guido van Rossum28524c72007-02-27 05:47:44 +0000562 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000563 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000564 self.write_ops(f)
565 data = f.getvalue()
566 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000567 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000568 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000569
Guido van Rossum53807da2007-04-10 19:01:47 +0000570 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000571 # On Windows and Mac OSX this test comsumes large resources; It takes
572 # a long time to build the >2GB file and takes >2GB of disk space
573 # therefore the resource must be enabled to run this test.
574 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600575 support.requires(
576 'largefile',
577 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000578 with self.open(support.TESTFN, "w+b", 0) as f:
579 self.large_file_ops(f)
580 with self.open(support.TESTFN, "w+b") as f:
581 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000582
583 def test_with_open(self):
584 for bufsize in (0, 1, 100):
585 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000586 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000587 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000588 self.assertEqual(f.closed, True)
589 f = None
590 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000591 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000592 1/0
593 except ZeroDivisionError:
594 self.assertEqual(f.closed, True)
595 else:
596 self.fail("1/0 didn't raise an exception")
597
Antoine Pitrou08838b62009-01-21 00:55:13 +0000598 # issue 5008
599 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000600 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000601 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000602 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000603 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000604 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000605 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000606 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300607 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000608
Guido van Rossum87429772007-04-10 21:06:59 +0000609 def test_destructor(self):
610 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000611 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000612 def __del__(self):
613 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000614 try:
615 f = super().__del__
616 except AttributeError:
617 pass
618 else:
619 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000620 def close(self):
621 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000622 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000623 def flush(self):
624 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000625 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000626 with support.check_warnings(('', ResourceWarning)):
627 f = MyFileIO(support.TESTFN, "wb")
628 f.write(b"xxx")
629 del f
630 support.gc_collect()
631 self.assertEqual(record, [1, 2, 3])
632 with self.open(support.TESTFN, "rb") as f:
633 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000634
635 def _check_base_destructor(self, base):
636 record = []
637 class MyIO(base):
638 def __init__(self):
639 # This exercises the availability of attributes on object
640 # destruction.
641 # (in the C version, close() is called by the tp_dealloc
642 # function, not by __del__)
643 self.on_del = 1
644 self.on_close = 2
645 self.on_flush = 3
646 def __del__(self):
647 record.append(self.on_del)
648 try:
649 f = super().__del__
650 except AttributeError:
651 pass
652 else:
653 f()
654 def close(self):
655 record.append(self.on_close)
656 super().close()
657 def flush(self):
658 record.append(self.on_flush)
659 super().flush()
660 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000661 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000662 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000663 self.assertEqual(record, [1, 2, 3])
664
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000665 def test_IOBase_destructor(self):
666 self._check_base_destructor(self.IOBase)
667
668 def test_RawIOBase_destructor(self):
669 self._check_base_destructor(self.RawIOBase)
670
671 def test_BufferedIOBase_destructor(self):
672 self._check_base_destructor(self.BufferedIOBase)
673
674 def test_TextIOBase_destructor(self):
675 self._check_base_destructor(self.TextIOBase)
676
Guido van Rossum87429772007-04-10 21:06:59 +0000677 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000678 with self.open(support.TESTFN, "wb") as f:
679 f.write(b"xxx")
680 with self.open(support.TESTFN, "rb") as f:
681 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000682
Guido van Rossumd4103952007-04-12 05:44:49 +0000683 def test_array_writes(self):
684 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000685 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000686 def check(f):
687 with f:
688 self.assertEqual(f.write(a), n)
689 f.writelines((a,))
690 check(self.BytesIO())
691 check(self.FileIO(support.TESTFN, "w"))
692 check(self.BufferedWriter(self.MockRawIO()))
693 check(self.BufferedRandom(self.MockRawIO()))
694 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000695
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000696 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000697 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000698 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000699
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000700 def test_read_closed(self):
701 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000702 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000703 with self.open(support.TESTFN, "r") as f:
704 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000705 self.assertEqual(file.read(), "egg\n")
706 file.seek(0)
707 file.close()
708 self.assertRaises(ValueError, file.read)
709
710 def test_no_closefd_with_filename(self):
711 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000712 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000713
714 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000715 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000716 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000717 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000718 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000719 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000720 self.assertEqual(file.buffer.raw.closefd, False)
721
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000722 def test_garbage_collection(self):
723 # FileIO objects are collected, and collecting them flushes
724 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000725 with support.check_warnings(('', ResourceWarning)):
726 f = self.FileIO(support.TESTFN, "wb")
727 f.write(b"abcxxx")
728 f.f = f
729 wr = weakref.ref(f)
730 del f
731 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300732 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000733 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000734 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000735
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000736 def test_unbounded_file(self):
737 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
738 zero = "/dev/zero"
739 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000740 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000741 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000742 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000743 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000744 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000745 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000746 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000747 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000748 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000749 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000750 self.assertRaises(OverflowError, f.read)
751
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200752 def check_flush_error_on_close(self, *args, **kwargs):
753 # Test that the file is closed despite failed flush
754 # and that flush() is called before file closed.
755 f = self.open(*args, **kwargs)
756 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000757 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200758 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200759 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000760 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200761 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600762 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200763 self.assertTrue(closed) # flush() called
764 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200765 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200766
767 def test_flush_error_on_close(self):
768 # raw file
769 # Issue #5700: io.FileIO calls flush() after file closed
770 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
771 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
772 self.check_flush_error_on_close(fd, 'wb', buffering=0)
773 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
774 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
775 os.close(fd)
776 # buffered io
777 self.check_flush_error_on_close(support.TESTFN, 'wb')
778 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
779 self.check_flush_error_on_close(fd, 'wb')
780 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
781 self.check_flush_error_on_close(fd, 'wb', closefd=False)
782 os.close(fd)
783 # text io
784 self.check_flush_error_on_close(support.TESTFN, 'w')
785 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
786 self.check_flush_error_on_close(fd, 'w')
787 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
788 self.check_flush_error_on_close(fd, 'w', closefd=False)
789 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000790
791 def test_multi_close(self):
792 f = self.open(support.TESTFN, "wb", buffering=0)
793 f.close()
794 f.close()
795 f.close()
796 self.assertRaises(ValueError, f.flush)
797
Antoine Pitrou328ec742010-09-14 18:37:24 +0000798 def test_RawIOBase_read(self):
799 # Exercise the default RawIOBase.read() implementation (which calls
800 # readinto() internally).
801 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
802 self.assertEqual(rawio.read(2), b"ab")
803 self.assertEqual(rawio.read(2), b"c")
804 self.assertEqual(rawio.read(2), b"d")
805 self.assertEqual(rawio.read(2), None)
806 self.assertEqual(rawio.read(2), b"ef")
807 self.assertEqual(rawio.read(2), b"g")
808 self.assertEqual(rawio.read(2), None)
809 self.assertEqual(rawio.read(2), b"")
810
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400811 def test_types_have_dict(self):
812 test = (
813 self.IOBase(),
814 self.RawIOBase(),
815 self.TextIOBase(),
816 self.StringIO(),
817 self.BytesIO()
818 )
819 for obj in test:
820 self.assertTrue(hasattr(obj, "__dict__"))
821
Ross Lagerwall59142db2011-10-31 20:34:46 +0200822 def test_opener(self):
823 with self.open(support.TESTFN, "w") as f:
824 f.write("egg\n")
825 fd = os.open(support.TESTFN, os.O_RDONLY)
826 def opener(path, flags):
827 return fd
828 with self.open("non-existent", "r", opener=opener) as f:
829 self.assertEqual(f.read(), "egg\n")
830
Barry Warsaw480e2852016-06-08 17:47:26 -0400831 def test_bad_opener_negative_1(self):
832 # Issue #27066.
833 def badopener(fname, flags):
834 return -1
835 with self.assertRaises(ValueError) as cm:
836 open('non-existent', 'r', opener=badopener)
837 self.assertEqual(str(cm.exception), 'opener returned -1')
838
839 def test_bad_opener_other_negative(self):
840 # Issue #27066.
841 def badopener(fname, flags):
842 return -2
843 with self.assertRaises(ValueError) as cm:
844 open('non-existent', 'r', opener=badopener)
845 self.assertEqual(str(cm.exception), 'opener returned -2')
846
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200847 def test_fileio_closefd(self):
848 # Issue #4841
849 with self.open(__file__, 'rb') as f1, \
850 self.open(__file__, 'rb') as f2:
851 fileio = self.FileIO(f1.fileno(), closefd=False)
852 # .__init__() must not close f1
853 fileio.__init__(f2.fileno(), closefd=False)
854 f1.readline()
855 # .close() must not close f2
856 fileio.close()
857 f2.readline()
858
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300859 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200860 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300861 with self.assertRaises(ValueError):
862 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300863
864 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200865 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300866 with self.assertRaises(ValueError):
867 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300868
Martin Panter6bb91f32016-05-28 00:41:57 +0000869 def test_buffered_readinto_mixin(self):
870 # Test the implementation provided by BufferedIOBase
871 class Stream(self.BufferedIOBase):
872 def read(self, size):
873 return b"12345"
874 read1 = read
875 stream = Stream()
876 for method in ("readinto", "readinto1"):
877 with self.subTest(method):
878 buffer = byteslike(5)
879 self.assertEqual(getattr(stream, method)(buffer), 5)
880 self.assertEqual(bytes(buffer), b"12345")
881
Ethan Furmand62548a2016-06-04 14:38:43 -0700882 def test_fspath_support(self):
883 class PathLike:
884 def __init__(self, path):
885 self.path = path
886
887 def __fspath__(self):
888 return self.path
889
890 def check_path_succeeds(path):
891 with self.open(path, "w") as f:
892 f.write("egg\n")
893
894 with self.open(path, "r") as f:
895 self.assertEqual(f.read(), "egg\n")
896
897 check_path_succeeds(PathLike(support.TESTFN))
898 check_path_succeeds(PathLike(support.TESTFN.encode('utf-8')))
899
900 bad_path = PathLike(TypeError)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700901 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700902 self.open(bad_path, 'w')
903
904 # ensure that refcounting is correct with some error conditions
905 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
906 self.open(PathLike(support.TESTFN), 'rwxa')
907
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200908
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000909class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200910
911 def test_IOBase_finalize(self):
912 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
913 # class which inherits IOBase and an object of this class are caught
914 # in a reference cycle and close() is already in the method cache.
915 class MyIO(self.IOBase):
916 def close(self):
917 pass
918
919 # create an instance to populate the method cache
920 MyIO()
921 obj = MyIO()
922 obj.obj = obj
923 wr = weakref.ref(obj)
924 del MyIO
925 del obj
926 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300927 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000928
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000929class PyIOTest(IOTest):
930 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000931
Guido van Rossuma9e20242007-03-08 00:43:48 +0000932
Gregory P. Smith1bef9072015-04-14 13:24:34 -0700933@support.cpython_only
934class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -0700935
Gregory P. Smith054b0652015-04-14 12:58:05 -0700936 def test_RawIOBase_io_in_pyio_match(self):
937 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +0200938 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
939 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -0700940 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
941
942 def test_RawIOBase_pyio_in_io_match(self):
943 """Test that c RawIOBase class has all pyio RawIOBase methods"""
944 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
945 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
946
947
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000948class CommonBufferedTests:
949 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
950
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000951 def test_detach(self):
952 raw = self.MockRawIO()
953 buf = self.tp(raw)
954 self.assertIs(buf.detach(), raw)
955 self.assertRaises(ValueError, buf.detach)
956
Benjamin Peterson10e76b62014-12-21 20:51:50 -0600957 repr(buf) # Should still work
958
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000959 def test_fileno(self):
960 rawio = self.MockRawIO()
961 bufio = self.tp(rawio)
962
Ezio Melottib3aedd42010-11-20 19:04:17 +0000963 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000964
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000965 def test_invalid_args(self):
966 rawio = self.MockRawIO()
967 bufio = self.tp(rawio)
968 # Invalid whence
969 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200970 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000971
972 def test_override_destructor(self):
973 tp = self.tp
974 record = []
975 class MyBufferedIO(tp):
976 def __del__(self):
977 record.append(1)
978 try:
979 f = super().__del__
980 except AttributeError:
981 pass
982 else:
983 f()
984 def close(self):
985 record.append(2)
986 super().close()
987 def flush(self):
988 record.append(3)
989 super().flush()
990 rawio = self.MockRawIO()
991 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000992 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000993 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +0000994 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000995
996 def test_context_manager(self):
997 # Test usability as a context manager
998 rawio = self.MockRawIO()
999 bufio = self.tp(rawio)
1000 def _with():
1001 with bufio:
1002 pass
1003 _with()
1004 # bufio should now be closed, and using it a second time should raise
1005 # a ValueError.
1006 self.assertRaises(ValueError, _with)
1007
1008 def test_error_through_destructor(self):
1009 # Test that the exception state is not modified by a destructor,
1010 # even if close() fails.
1011 rawio = self.CloseFailureIO()
1012 def f():
1013 self.tp(rawio).xyzzy
1014 with support.captured_output("stderr") as s:
1015 self.assertRaises(AttributeError, f)
1016 s = s.getvalue().strip()
1017 if s:
1018 # The destructor *may* have printed an unraisable error, check it
1019 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001020 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001021 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +00001022
Antoine Pitrou716c4442009-05-23 19:04:03 +00001023 def test_repr(self):
1024 raw = self.MockRawIO()
1025 b = self.tp(raw)
Serhiy Storchaka521e5862014-07-22 15:00:37 +03001026 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001027 self.assertEqual(repr(b), "<%s>" % clsname)
1028 raw.name = "dummy"
1029 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
1030 raw.name = b"dummy"
1031 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
1032
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02001033 def test_recursive_repr(self):
1034 # Issue #25455
1035 raw = self.MockRawIO()
1036 b = self.tp(raw)
1037 with support.swap_attr(raw, 'name', b):
1038 try:
1039 repr(b) # Should not crash
1040 except RuntimeError:
1041 pass
1042
Antoine Pitrou6be88762010-05-03 16:48:20 +00001043 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001044 # Test that buffered file is closed despite failed flush
1045 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001046 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001047 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001048 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001049 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001050 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001051 raw.flush = bad_flush
1052 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001053 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001054 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001055 self.assertTrue(raw.closed)
1056 self.assertTrue(closed) # flush() called
1057 self.assertFalse(closed[0]) # flush() called before file closed
1058 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001059 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001060
1061 def test_close_error_on_close(self):
1062 raw = self.MockRawIO()
1063 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001064 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001065 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001066 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001067 raw.close = bad_close
1068 b = self.tp(raw)
1069 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001070 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001071 b.close()
1072 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001073 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001074 self.assertEqual(err.exception.__context__.args, ('flush',))
1075 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001076
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001077 def test_nonnormalized_close_error_on_close(self):
1078 # Issue #21677
1079 raw = self.MockRawIO()
1080 def bad_flush():
1081 raise non_existing_flush
1082 def bad_close():
1083 raise non_existing_close
1084 raw.close = bad_close
1085 b = self.tp(raw)
1086 b.flush = bad_flush
1087 with self.assertRaises(NameError) as err: # exception not swallowed
1088 b.close()
1089 self.assertIn('non_existing_close', str(err.exception))
1090 self.assertIsInstance(err.exception.__context__, NameError)
1091 self.assertIn('non_existing_flush', str(err.exception.__context__))
1092 self.assertFalse(b.closed)
1093
Antoine Pitrou6be88762010-05-03 16:48:20 +00001094 def test_multi_close(self):
1095 raw = self.MockRawIO()
1096 b = self.tp(raw)
1097 b.close()
1098 b.close()
1099 b.close()
1100 self.assertRaises(ValueError, b.flush)
1101
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001102 def test_unseekable(self):
1103 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1104 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1105 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1106
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001107 def test_readonly_attributes(self):
1108 raw = self.MockRawIO()
1109 buf = self.tp(raw)
1110 x = self.MockRawIO()
1111 with self.assertRaises(AttributeError):
1112 buf.raw = x
1113
Guido van Rossum78892e42007-04-06 17:31:18 +00001114
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001115class SizeofTest:
1116
1117 @support.cpython_only
1118 def test_sizeof(self):
1119 bufsize1 = 4096
1120 bufsize2 = 8192
1121 rawio = self.MockRawIO()
1122 bufio = self.tp(rawio, buffer_size=bufsize1)
1123 size = sys.getsizeof(bufio) - bufsize1
1124 rawio = self.MockRawIO()
1125 bufio = self.tp(rawio, buffer_size=bufsize2)
1126 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1127
Jesus Ceadc469452012-10-04 12:37:56 +02001128 @support.cpython_only
1129 def test_buffer_freeing(self) :
1130 bufsize = 4096
1131 rawio = self.MockRawIO()
1132 bufio = self.tp(rawio, buffer_size=bufsize)
1133 size = sys.getsizeof(bufio) - bufsize
1134 bufio.close()
1135 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001136
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001137class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1138 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001139
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001140 def test_constructor(self):
1141 rawio = self.MockRawIO([b"abc"])
1142 bufio = self.tp(rawio)
1143 bufio.__init__(rawio)
1144 bufio.__init__(rawio, buffer_size=1024)
1145 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001146 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001147 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1148 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1149 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1150 rawio = self.MockRawIO([b"abc"])
1151 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001152 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001153
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001154 def test_uninitialized(self):
1155 bufio = self.tp.__new__(self.tp)
1156 del bufio
1157 bufio = self.tp.__new__(self.tp)
1158 self.assertRaisesRegex((ValueError, AttributeError),
1159 'uninitialized|has no attribute',
1160 bufio.read, 0)
1161 bufio.__init__(self.MockRawIO())
1162 self.assertEqual(bufio.read(0), b'')
1163
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001164 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001165 for arg in (None, 7):
1166 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1167 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001168 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001169 # Invalid args
1170 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001171
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001172 def test_read1(self):
1173 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1174 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001175 self.assertEqual(b"a", bufio.read(1))
1176 self.assertEqual(b"b", bufio.read1(1))
1177 self.assertEqual(rawio._reads, 1)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001178 self.assertEqual(b"", bufio.read1(0))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001179 self.assertEqual(b"c", bufio.read1(100))
1180 self.assertEqual(rawio._reads, 1)
1181 self.assertEqual(b"d", bufio.read1(100))
1182 self.assertEqual(rawio._reads, 2)
1183 self.assertEqual(b"efg", bufio.read1(100))
1184 self.assertEqual(rawio._reads, 3)
1185 self.assertEqual(b"", bufio.read1(100))
1186 self.assertEqual(rawio._reads, 4)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001187
1188 def test_read1_arbitrary(self):
1189 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1190 bufio = self.tp(rawio)
1191 self.assertEqual(b"a", bufio.read(1))
1192 self.assertEqual(b"bc", bufio.read1())
1193 self.assertEqual(b"d", bufio.read1())
1194 self.assertEqual(b"efg", bufio.read1(-1))
1195 self.assertEqual(rawio._reads, 3)
1196 self.assertEqual(b"", bufio.read1())
1197 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001198
1199 def test_readinto(self):
1200 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1201 bufio = self.tp(rawio)
1202 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001203 self.assertEqual(bufio.readinto(b), 2)
1204 self.assertEqual(b, b"ab")
1205 self.assertEqual(bufio.readinto(b), 2)
1206 self.assertEqual(b, b"cd")
1207 self.assertEqual(bufio.readinto(b), 2)
1208 self.assertEqual(b, b"ef")
1209 self.assertEqual(bufio.readinto(b), 1)
1210 self.assertEqual(b, b"gf")
1211 self.assertEqual(bufio.readinto(b), 0)
1212 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001213 rawio = self.MockRawIO((b"abc", None))
1214 bufio = self.tp(rawio)
1215 self.assertEqual(bufio.readinto(b), 2)
1216 self.assertEqual(b, b"ab")
1217 self.assertEqual(bufio.readinto(b), 1)
1218 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001219
Benjamin Petersona96fea02014-06-22 14:17:44 -07001220 def test_readinto1(self):
1221 buffer_size = 10
1222 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1223 bufio = self.tp(rawio, buffer_size=buffer_size)
1224 b = bytearray(2)
1225 self.assertEqual(bufio.peek(3), b'abc')
1226 self.assertEqual(rawio._reads, 1)
1227 self.assertEqual(bufio.readinto1(b), 2)
1228 self.assertEqual(b, b"ab")
1229 self.assertEqual(rawio._reads, 1)
1230 self.assertEqual(bufio.readinto1(b), 1)
1231 self.assertEqual(b[:1], b"c")
1232 self.assertEqual(rawio._reads, 1)
1233 self.assertEqual(bufio.readinto1(b), 2)
1234 self.assertEqual(b, b"de")
1235 self.assertEqual(rawio._reads, 2)
1236 b = bytearray(2*buffer_size)
1237 self.assertEqual(bufio.peek(3), b'fgh')
1238 self.assertEqual(rawio._reads, 3)
1239 self.assertEqual(bufio.readinto1(b), 6)
1240 self.assertEqual(b[:6], b"fghjkl")
1241 self.assertEqual(rawio._reads, 4)
1242
1243 def test_readinto_array(self):
1244 buffer_size = 60
1245 data = b"a" * 26
1246 rawio = self.MockRawIO((data,))
1247 bufio = self.tp(rawio, buffer_size=buffer_size)
1248
1249 # Create an array with element size > 1 byte
1250 b = array.array('i', b'x' * 32)
1251 assert len(b) != 16
1252
1253 # Read into it. We should get as many *bytes* as we can fit into b
1254 # (which is more than the number of elements)
1255 n = bufio.readinto(b)
1256 self.assertGreater(n, len(b))
1257
1258 # Check that old contents of b are preserved
1259 bm = memoryview(b).cast('B')
1260 self.assertLess(n, len(bm))
1261 self.assertEqual(bm[:n], data[:n])
1262 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1263
1264 def test_readinto1_array(self):
1265 buffer_size = 60
1266 data = b"a" * 26
1267 rawio = self.MockRawIO((data,))
1268 bufio = self.tp(rawio, buffer_size=buffer_size)
1269
1270 # Create an array with element size > 1 byte
1271 b = array.array('i', b'x' * 32)
1272 assert len(b) != 16
1273
1274 # Read into it. We should get as many *bytes* as we can fit into b
1275 # (which is more than the number of elements)
1276 n = bufio.readinto1(b)
1277 self.assertGreater(n, len(b))
1278
1279 # Check that old contents of b are preserved
1280 bm = memoryview(b).cast('B')
1281 self.assertLess(n, len(bm))
1282 self.assertEqual(bm[:n], data[:n])
1283 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1284
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001285 def test_readlines(self):
1286 def bufio():
1287 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1288 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001289 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1290 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1291 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001292
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001293 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001294 data = b"abcdefghi"
1295 dlen = len(data)
1296
1297 tests = [
1298 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1299 [ 100, [ 3, 3, 3], [ dlen ] ],
1300 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1301 ]
1302
1303 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001304 rawio = self.MockFileIO(data)
1305 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001306 pos = 0
1307 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001308 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001309 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001310 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001311 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001312
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001313 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001314 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001315 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1316 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001317 self.assertEqual(b"abcd", bufio.read(6))
1318 self.assertEqual(b"e", bufio.read(1))
1319 self.assertEqual(b"fg", bufio.read())
1320 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001321 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001322 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001323
Victor Stinnera80987f2011-05-25 22:47:16 +02001324 rawio = self.MockRawIO((b"a", None, None))
1325 self.assertEqual(b"a", rawio.readall())
1326 self.assertIsNone(rawio.readall())
1327
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001328 def test_read_past_eof(self):
1329 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1330 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001331
Ezio Melottib3aedd42010-11-20 19:04:17 +00001332 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001333
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001334 def test_read_all(self):
1335 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1336 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001337
Ezio Melottib3aedd42010-11-20 19:04:17 +00001338 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001339
Victor Stinner45df8202010-04-28 22:31:17 +00001340 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001341 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001342 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001343 try:
1344 # Write out many bytes with exactly the same number of 0's,
1345 # 1's... 255's. This will help us check that concurrent reading
1346 # doesn't duplicate or forget contents.
1347 N = 1000
1348 l = list(range(256)) * N
1349 random.shuffle(l)
1350 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001351 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001352 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001353 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001354 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001355 errors = []
1356 results = []
1357 def f():
1358 try:
1359 # Intra-buffer read then buffer-flushing read
1360 for n in cycle([1, 19]):
1361 s = bufio.read(n)
1362 if not s:
1363 break
1364 # list.append() is atomic
1365 results.append(s)
1366 except Exception as e:
1367 errors.append(e)
1368 raise
1369 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001370 with support.start_threads(threads):
1371 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001372 self.assertFalse(errors,
1373 "the following exceptions were caught: %r" % errors)
1374 s = b''.join(results)
1375 for i in range(256):
1376 c = bytes(bytearray([i]))
1377 self.assertEqual(s.count(c), N)
1378 finally:
1379 support.unlink(support.TESTFN)
1380
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001381 def test_unseekable(self):
1382 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1383 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1384 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1385 bufio.read(1)
1386 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1387 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1388
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001389 def test_misbehaved_io(self):
1390 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1391 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001392 self.assertRaises(OSError, bufio.seek, 0)
1393 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001394
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001395 def test_no_extraneous_read(self):
1396 # Issue #9550; when the raw IO object has satisfied the read request,
1397 # we should not issue any additional reads, otherwise it may block
1398 # (e.g. socket).
1399 bufsize = 16
1400 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1401 rawio = self.MockRawIO([b"x" * n])
1402 bufio = self.tp(rawio, bufsize)
1403 self.assertEqual(bufio.read(n), b"x" * n)
1404 # Simple case: one raw read is enough to satisfy the request.
1405 self.assertEqual(rawio._extraneous_reads, 0,
1406 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1407 # A more complex case where two raw reads are needed to satisfy
1408 # the request.
1409 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1410 bufio = self.tp(rawio, bufsize)
1411 self.assertEqual(bufio.read(n), b"x" * n)
1412 self.assertEqual(rawio._extraneous_reads, 0,
1413 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1414
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001415 def test_read_on_closed(self):
1416 # Issue #23796
1417 b = io.BufferedReader(io.BytesIO(b"12"))
1418 b.read(1)
1419 b.close()
1420 self.assertRaises(ValueError, b.peek)
1421 self.assertRaises(ValueError, b.read1, 1)
1422
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001423
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001424class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001425 tp = io.BufferedReader
1426
1427 def test_constructor(self):
1428 BufferedReaderTest.test_constructor(self)
1429 # The allocation can succeed on 32-bit builds, e.g. with more
1430 # than 2GB RAM and a 64-bit kernel.
1431 if sys.maxsize > 0x7FFFFFFF:
1432 rawio = self.MockRawIO()
1433 bufio = self.tp(rawio)
1434 self.assertRaises((OverflowError, MemoryError, ValueError),
1435 bufio.__init__, rawio, sys.maxsize)
1436
1437 def test_initialization(self):
1438 rawio = self.MockRawIO([b"abc"])
1439 bufio = self.tp(rawio)
1440 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1441 self.assertRaises(ValueError, bufio.read)
1442 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1443 self.assertRaises(ValueError, bufio.read)
1444 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1445 self.assertRaises(ValueError, bufio.read)
1446
1447 def test_misbehaved_io_read(self):
1448 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1449 bufio = self.tp(rawio)
1450 # _pyio.BufferedReader seems to implement reading different, so that
1451 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001452 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001453
1454 def test_garbage_collection(self):
1455 # C BufferedReader objects are collected.
1456 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001457 with support.check_warnings(('', ResourceWarning)):
1458 rawio = self.FileIO(support.TESTFN, "w+b")
1459 f = self.tp(rawio)
1460 f.f = f
1461 wr = weakref.ref(f)
1462 del f
1463 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001464 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001465
R David Murray67bfe802013-02-23 21:51:05 -05001466 def test_args_error(self):
1467 # Issue #17275
1468 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1469 self.tp(io.BytesIO(), 1024, 1024, 1024)
1470
1471
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001472class PyBufferedReaderTest(BufferedReaderTest):
1473 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001474
Guido van Rossuma9e20242007-03-08 00:43:48 +00001475
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001476class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1477 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001478
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001479 def test_constructor(self):
1480 rawio = self.MockRawIO()
1481 bufio = self.tp(rawio)
1482 bufio.__init__(rawio)
1483 bufio.__init__(rawio, buffer_size=1024)
1484 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001485 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001486 bufio.flush()
1487 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1488 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1489 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1490 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001491 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001492 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001493 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001494
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001495 def test_uninitialized(self):
1496 bufio = self.tp.__new__(self.tp)
1497 del bufio
1498 bufio = self.tp.__new__(self.tp)
1499 self.assertRaisesRegex((ValueError, AttributeError),
1500 'uninitialized|has no attribute',
1501 bufio.write, b'')
1502 bufio.__init__(self.MockRawIO())
1503 self.assertEqual(bufio.write(b''), 0)
1504
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001505 def test_detach_flush(self):
1506 raw = self.MockRawIO()
1507 buf = self.tp(raw)
1508 buf.write(b"howdy!")
1509 self.assertFalse(raw._write_stack)
1510 buf.detach()
1511 self.assertEqual(raw._write_stack, [b"howdy!"])
1512
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001513 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001514 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001515 writer = self.MockRawIO()
1516 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001517 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001518 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001519 buffer = bytearray(b"def")
1520 bufio.write(buffer)
1521 buffer[:] = b"***" # Overwrite our copy of the data
1522 bufio.flush()
1523 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001524
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001525 def test_write_overflow(self):
1526 writer = self.MockRawIO()
1527 bufio = self.tp(writer, 8)
1528 contents = b"abcdefghijklmnop"
1529 for n in range(0, len(contents), 3):
1530 bufio.write(contents[n:n+3])
1531 flushed = b"".join(writer._write_stack)
1532 # At least (total - 8) bytes were implicitly flushed, perhaps more
1533 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001534 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001535
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001536 def check_writes(self, intermediate_func):
1537 # Lots of writes, test the flushed output is as expected.
1538 contents = bytes(range(256)) * 1000
1539 n = 0
1540 writer = self.MockRawIO()
1541 bufio = self.tp(writer, 13)
1542 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1543 def gen_sizes():
1544 for size in count(1):
1545 for i in range(15):
1546 yield size
1547 sizes = gen_sizes()
1548 while n < len(contents):
1549 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001550 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001551 intermediate_func(bufio)
1552 n += size
1553 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001554 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001555
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001556 def test_writes(self):
1557 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001558
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001559 def test_writes_and_flushes(self):
1560 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001561
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001562 def test_writes_and_seeks(self):
1563 def _seekabs(bufio):
1564 pos = bufio.tell()
1565 bufio.seek(pos + 1, 0)
1566 bufio.seek(pos - 1, 0)
1567 bufio.seek(pos, 0)
1568 self.check_writes(_seekabs)
1569 def _seekrel(bufio):
1570 pos = bufio.seek(0, 1)
1571 bufio.seek(+1, 1)
1572 bufio.seek(-1, 1)
1573 bufio.seek(pos, 0)
1574 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001575
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001576 def test_writes_and_truncates(self):
1577 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001578
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001579 def test_write_non_blocking(self):
1580 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001581 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001582
Ezio Melottib3aedd42010-11-20 19:04:17 +00001583 self.assertEqual(bufio.write(b"abcd"), 4)
1584 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001585 # 1 byte will be written, the rest will be buffered
1586 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001587 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001588
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001589 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1590 raw.block_on(b"0")
1591 try:
1592 bufio.write(b"opqrwxyz0123456789")
1593 except self.BlockingIOError as e:
1594 written = e.characters_written
1595 else:
1596 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001597 self.assertEqual(written, 16)
1598 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001599 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001600
Ezio Melottib3aedd42010-11-20 19:04:17 +00001601 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001602 s = raw.pop_written()
1603 # Previously buffered bytes were flushed
1604 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001605
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001606 def test_write_and_rewind(self):
1607 raw = io.BytesIO()
1608 bufio = self.tp(raw, 4)
1609 self.assertEqual(bufio.write(b"abcdef"), 6)
1610 self.assertEqual(bufio.tell(), 6)
1611 bufio.seek(0, 0)
1612 self.assertEqual(bufio.write(b"XY"), 2)
1613 bufio.seek(6, 0)
1614 self.assertEqual(raw.getvalue(), b"XYcdef")
1615 self.assertEqual(bufio.write(b"123456"), 6)
1616 bufio.flush()
1617 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001618
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001619 def test_flush(self):
1620 writer = self.MockRawIO()
1621 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001622 bufio.write(b"abc")
1623 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001624 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001625
Antoine Pitrou131a4892012-10-16 22:57:11 +02001626 def test_writelines(self):
1627 l = [b'ab', b'cd', b'ef']
1628 writer = self.MockRawIO()
1629 bufio = self.tp(writer, 8)
1630 bufio.writelines(l)
1631 bufio.flush()
1632 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1633
1634 def test_writelines_userlist(self):
1635 l = UserList([b'ab', b'cd', b'ef'])
1636 writer = self.MockRawIO()
1637 bufio = self.tp(writer, 8)
1638 bufio.writelines(l)
1639 bufio.flush()
1640 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1641
1642 def test_writelines_error(self):
1643 writer = self.MockRawIO()
1644 bufio = self.tp(writer, 8)
1645 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1646 self.assertRaises(TypeError, bufio.writelines, None)
1647 self.assertRaises(TypeError, bufio.writelines, 'abc')
1648
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001649 def test_destructor(self):
1650 writer = self.MockRawIO()
1651 bufio = self.tp(writer, 8)
1652 bufio.write(b"abc")
1653 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001654 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001655 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001656
1657 def test_truncate(self):
1658 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001659 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001660 bufio = self.tp(raw, 8)
1661 bufio.write(b"abcdef")
1662 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001663 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001664 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001665 self.assertEqual(f.read(), b"abc")
1666
Victor Stinner45df8202010-04-28 22:31:17 +00001667 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001668 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001669 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001670 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001671 # Write out many bytes from many threads and test they were
1672 # all flushed.
1673 N = 1000
1674 contents = bytes(range(256)) * N
1675 sizes = cycle([1, 19])
1676 n = 0
1677 queue = deque()
1678 while n < len(contents):
1679 size = next(sizes)
1680 queue.append(contents[n:n+size])
1681 n += size
1682 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001683 # We use a real file object because it allows us to
1684 # exercise situations where the GIL is released before
1685 # writing the buffer to the raw streams. This is in addition
1686 # to concurrency issues due to switching threads in the middle
1687 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001688 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001689 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001690 errors = []
1691 def f():
1692 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001693 while True:
1694 try:
1695 s = queue.popleft()
1696 except IndexError:
1697 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001698 bufio.write(s)
1699 except Exception as e:
1700 errors.append(e)
1701 raise
1702 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001703 with support.start_threads(threads):
1704 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001705 self.assertFalse(errors,
1706 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001707 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001708 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001709 s = f.read()
1710 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001711 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001712 finally:
1713 support.unlink(support.TESTFN)
1714
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001715 def test_misbehaved_io(self):
1716 rawio = self.MisbehavedRawIO()
1717 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001718 self.assertRaises(OSError, bufio.seek, 0)
1719 self.assertRaises(OSError, bufio.tell)
1720 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001721
Florent Xicluna109d5732012-07-07 17:03:22 +02001722 def test_max_buffer_size_removal(self):
1723 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001724 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001725
Benjamin Peterson68623612012-12-20 11:53:11 -06001726 def test_write_error_on_close(self):
1727 raw = self.MockRawIO()
1728 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001729 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001730 raw.write = bad_write
1731 b = self.tp(raw)
1732 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001733 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001734 self.assertTrue(b.closed)
1735
Benjamin Peterson59406a92009-03-26 17:10:29 +00001736
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001737class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001738 tp = io.BufferedWriter
1739
1740 def test_constructor(self):
1741 BufferedWriterTest.test_constructor(self)
1742 # The allocation can succeed on 32-bit builds, e.g. with more
1743 # than 2GB RAM and a 64-bit kernel.
1744 if sys.maxsize > 0x7FFFFFFF:
1745 rawio = self.MockRawIO()
1746 bufio = self.tp(rawio)
1747 self.assertRaises((OverflowError, MemoryError, ValueError),
1748 bufio.__init__, rawio, sys.maxsize)
1749
1750 def test_initialization(self):
1751 rawio = self.MockRawIO()
1752 bufio = self.tp(rawio)
1753 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1754 self.assertRaises(ValueError, bufio.write, b"def")
1755 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1756 self.assertRaises(ValueError, bufio.write, b"def")
1757 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1758 self.assertRaises(ValueError, bufio.write, b"def")
1759
1760 def test_garbage_collection(self):
1761 # C BufferedWriter objects are collected, and collecting them flushes
1762 # all data to disk.
1763 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001764 with support.check_warnings(('', ResourceWarning)):
1765 rawio = self.FileIO(support.TESTFN, "w+b")
1766 f = self.tp(rawio)
1767 f.write(b"123xxx")
1768 f.x = f
1769 wr = weakref.ref(f)
1770 del f
1771 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001772 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001773 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001774 self.assertEqual(f.read(), b"123xxx")
1775
R David Murray67bfe802013-02-23 21:51:05 -05001776 def test_args_error(self):
1777 # Issue #17275
1778 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1779 self.tp(io.BytesIO(), 1024, 1024, 1024)
1780
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001781
1782class PyBufferedWriterTest(BufferedWriterTest):
1783 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001784
Guido van Rossum01a27522007-03-07 01:00:12 +00001785class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001786
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001787 def test_constructor(self):
1788 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001789 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001790
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001791 def test_uninitialized(self):
1792 pair = self.tp.__new__(self.tp)
1793 del pair
1794 pair = self.tp.__new__(self.tp)
1795 self.assertRaisesRegex((ValueError, AttributeError),
1796 'uninitialized|has no attribute',
1797 pair.read, 0)
1798 self.assertRaisesRegex((ValueError, AttributeError),
1799 'uninitialized|has no attribute',
1800 pair.write, b'')
1801 pair.__init__(self.MockRawIO(), self.MockRawIO())
1802 self.assertEqual(pair.read(0), b'')
1803 self.assertEqual(pair.write(b''), 0)
1804
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001805 def test_detach(self):
1806 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1807 self.assertRaises(self.UnsupportedOperation, pair.detach)
1808
Florent Xicluna109d5732012-07-07 17:03:22 +02001809 def test_constructor_max_buffer_size_removal(self):
1810 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001811 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001812
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001813 def test_constructor_with_not_readable(self):
1814 class NotReadable(MockRawIO):
1815 def readable(self):
1816 return False
1817
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001818 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001819
1820 def test_constructor_with_not_writeable(self):
1821 class NotWriteable(MockRawIO):
1822 def writable(self):
1823 return False
1824
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001825 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001826
1827 def test_read(self):
1828 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1829
1830 self.assertEqual(pair.read(3), b"abc")
1831 self.assertEqual(pair.read(1), b"d")
1832 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001833 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1834 self.assertEqual(pair.read(None), b"abc")
1835
1836 def test_readlines(self):
1837 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1838 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1839 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1840 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001841
1842 def test_read1(self):
1843 # .read1() is delegated to the underlying reader object, so this test
1844 # can be shallow.
1845 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1846
1847 self.assertEqual(pair.read1(3), b"abc")
Martin Panterccb2c0e2016-10-20 23:48:14 +00001848 self.assertEqual(pair.read1(), b"def")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001849
1850 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00001851 for method in ("readinto", "readinto1"):
1852 with self.subTest(method):
1853 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001854
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03001855 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00001856 self.assertEqual(getattr(pair, method)(data), 5)
1857 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001858
1859 def test_write(self):
1860 w = self.MockRawIO()
1861 pair = self.tp(self.MockRawIO(), w)
1862
1863 pair.write(b"abc")
1864 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00001865 buffer = bytearray(b"def")
1866 pair.write(buffer)
1867 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001868 pair.flush()
1869 self.assertEqual(w._write_stack, [b"abc", b"def"])
1870
1871 def test_peek(self):
1872 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1873
1874 self.assertTrue(pair.peek(3).startswith(b"abc"))
1875 self.assertEqual(pair.read(3), b"abc")
1876
1877 def test_readable(self):
1878 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1879 self.assertTrue(pair.readable())
1880
1881 def test_writeable(self):
1882 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1883 self.assertTrue(pair.writable())
1884
1885 def test_seekable(self):
1886 # BufferedRWPairs are never seekable, even if their readers and writers
1887 # are.
1888 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1889 self.assertFalse(pair.seekable())
1890
1891 # .flush() is delegated to the underlying writer object and has been
1892 # tested in the test_write method.
1893
1894 def test_close_and_closed(self):
1895 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1896 self.assertFalse(pair.closed)
1897 pair.close()
1898 self.assertTrue(pair.closed)
1899
Serhiy Storchaka7665be62015-03-24 23:21:57 +02001900 def test_reader_close_error_on_close(self):
1901 def reader_close():
1902 reader_non_existing
1903 reader = self.MockRawIO()
1904 reader.close = reader_close
1905 writer = self.MockRawIO()
1906 pair = self.tp(reader, writer)
1907 with self.assertRaises(NameError) as err:
1908 pair.close()
1909 self.assertIn('reader_non_existing', str(err.exception))
1910 self.assertTrue(pair.closed)
1911 self.assertFalse(reader.closed)
1912 self.assertTrue(writer.closed)
1913
1914 def test_writer_close_error_on_close(self):
1915 def writer_close():
1916 writer_non_existing
1917 reader = self.MockRawIO()
1918 writer = self.MockRawIO()
1919 writer.close = writer_close
1920 pair = self.tp(reader, writer)
1921 with self.assertRaises(NameError) as err:
1922 pair.close()
1923 self.assertIn('writer_non_existing', str(err.exception))
1924 self.assertFalse(pair.closed)
1925 self.assertTrue(reader.closed)
1926 self.assertFalse(writer.closed)
1927
1928 def test_reader_writer_close_error_on_close(self):
1929 def reader_close():
1930 reader_non_existing
1931 def writer_close():
1932 writer_non_existing
1933 reader = self.MockRawIO()
1934 reader.close = reader_close
1935 writer = self.MockRawIO()
1936 writer.close = writer_close
1937 pair = self.tp(reader, writer)
1938 with self.assertRaises(NameError) as err:
1939 pair.close()
1940 self.assertIn('reader_non_existing', str(err.exception))
1941 self.assertIsInstance(err.exception.__context__, NameError)
1942 self.assertIn('writer_non_existing', str(err.exception.__context__))
1943 self.assertFalse(pair.closed)
1944 self.assertFalse(reader.closed)
1945 self.assertFalse(writer.closed)
1946
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001947 def test_isatty(self):
1948 class SelectableIsAtty(MockRawIO):
1949 def __init__(self, isatty):
1950 MockRawIO.__init__(self)
1951 self._isatty = isatty
1952
1953 def isatty(self):
1954 return self._isatty
1955
1956 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1957 self.assertFalse(pair.isatty())
1958
1959 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1960 self.assertTrue(pair.isatty())
1961
1962 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1963 self.assertTrue(pair.isatty())
1964
1965 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1966 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001967
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04001968 def test_weakref_clearing(self):
1969 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1970 ref = weakref.ref(brw)
1971 brw = None
1972 ref = None # Shouldn't segfault.
1973
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001974class CBufferedRWPairTest(BufferedRWPairTest):
1975 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001976
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001977class PyBufferedRWPairTest(BufferedRWPairTest):
1978 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001979
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001980
1981class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1982 read_mode = "rb+"
1983 write_mode = "wb+"
1984
1985 def test_constructor(self):
1986 BufferedReaderTest.test_constructor(self)
1987 BufferedWriterTest.test_constructor(self)
1988
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001989 def test_uninitialized(self):
1990 BufferedReaderTest.test_uninitialized(self)
1991 BufferedWriterTest.test_uninitialized(self)
1992
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001993 def test_read_and_write(self):
1994 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001995 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001996
1997 self.assertEqual(b"as", rw.read(2))
1998 rw.write(b"ddd")
1999 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002000 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002001 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002002 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00002003
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002004 def test_seek_and_tell(self):
2005 raw = self.BytesIO(b"asdfghjkl")
2006 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00002007
Ezio Melottib3aedd42010-11-20 19:04:17 +00002008 self.assertEqual(b"as", rw.read(2))
2009 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002010 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002011 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00002012
Antoine Pitroue05565e2011-08-20 14:39:23 +02002013 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00002014 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02002015 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002016 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002017 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002018 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002019 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002020 self.assertEqual(7, rw.tell())
2021 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02002022 rw.flush()
2023 self.assertEqual(b"asdf123fl", raw.getvalue())
2024
Christian Heimes8e42a0a2007-11-08 18:04:45 +00002025 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002026
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002027 def check_flush_and_read(self, read_func):
2028 raw = self.BytesIO(b"abcdefghi")
2029 bufio = self.tp(raw)
2030
Ezio Melottib3aedd42010-11-20 19:04:17 +00002031 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002032 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002033 self.assertEqual(b"ef", read_func(bufio, 2))
2034 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002035 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002036 self.assertEqual(6, bufio.tell())
2037 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002038 raw.seek(0, 0)
2039 raw.write(b"XYZ")
2040 # flush() resets the read buffer
2041 bufio.flush()
2042 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002043 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002044
2045 def test_flush_and_read(self):
2046 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2047
2048 def test_flush_and_readinto(self):
2049 def _readinto(bufio, n=-1):
2050 b = bytearray(n if n >= 0 else 9999)
2051 n = bufio.readinto(b)
2052 return bytes(b[:n])
2053 self.check_flush_and_read(_readinto)
2054
2055 def test_flush_and_peek(self):
2056 def _peek(bufio, n=-1):
2057 # This relies on the fact that the buffer can contain the whole
2058 # raw stream, otherwise peek() can return less.
2059 b = bufio.peek(n)
2060 if n != -1:
2061 b = b[:n]
2062 bufio.seek(len(b), 1)
2063 return b
2064 self.check_flush_and_read(_peek)
2065
2066 def test_flush_and_write(self):
2067 raw = self.BytesIO(b"abcdefghi")
2068 bufio = self.tp(raw)
2069
2070 bufio.write(b"123")
2071 bufio.flush()
2072 bufio.write(b"45")
2073 bufio.flush()
2074 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002075 self.assertEqual(b"12345fghi", raw.getvalue())
2076 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002077
2078 def test_threads(self):
2079 BufferedReaderTest.test_threads(self)
2080 BufferedWriterTest.test_threads(self)
2081
2082 def test_writes_and_peek(self):
2083 def _peek(bufio):
2084 bufio.peek(1)
2085 self.check_writes(_peek)
2086 def _peek(bufio):
2087 pos = bufio.tell()
2088 bufio.seek(-1, 1)
2089 bufio.peek(1)
2090 bufio.seek(pos, 0)
2091 self.check_writes(_peek)
2092
2093 def test_writes_and_reads(self):
2094 def _read(bufio):
2095 bufio.seek(-1, 1)
2096 bufio.read(1)
2097 self.check_writes(_read)
2098
2099 def test_writes_and_read1s(self):
2100 def _read1(bufio):
2101 bufio.seek(-1, 1)
2102 bufio.read1(1)
2103 self.check_writes(_read1)
2104
2105 def test_writes_and_readintos(self):
2106 def _read(bufio):
2107 bufio.seek(-1, 1)
2108 bufio.readinto(bytearray(1))
2109 self.check_writes(_read)
2110
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002111 def test_write_after_readahead(self):
2112 # Issue #6629: writing after the buffer was filled by readahead should
2113 # first rewind the raw stream.
2114 for overwrite_size in [1, 5]:
2115 raw = self.BytesIO(b"A" * 10)
2116 bufio = self.tp(raw, 4)
2117 # Trigger readahead
2118 self.assertEqual(bufio.read(1), b"A")
2119 self.assertEqual(bufio.tell(), 1)
2120 # Overwriting should rewind the raw stream if it needs so
2121 bufio.write(b"B" * overwrite_size)
2122 self.assertEqual(bufio.tell(), overwrite_size + 1)
2123 # If the write size was smaller than the buffer size, flush() and
2124 # check that rewind happens.
2125 bufio.flush()
2126 self.assertEqual(bufio.tell(), overwrite_size + 1)
2127 s = raw.getvalue()
2128 self.assertEqual(s,
2129 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2130
Antoine Pitrou7c404892011-05-13 00:13:33 +02002131 def test_write_rewind_write(self):
2132 # Various combinations of reading / writing / seeking backwards / writing again
2133 def mutate(bufio, pos1, pos2):
2134 assert pos2 >= pos1
2135 # Fill the buffer
2136 bufio.seek(pos1)
2137 bufio.read(pos2 - pos1)
2138 bufio.write(b'\x02')
2139 # This writes earlier than the previous write, but still inside
2140 # the buffer.
2141 bufio.seek(pos1)
2142 bufio.write(b'\x01')
2143
2144 b = b"\x80\x81\x82\x83\x84"
2145 for i in range(0, len(b)):
2146 for j in range(i, len(b)):
2147 raw = self.BytesIO(b)
2148 bufio = self.tp(raw, 100)
2149 mutate(bufio, i, j)
2150 bufio.flush()
2151 expected = bytearray(b)
2152 expected[j] = 2
2153 expected[i] = 1
2154 self.assertEqual(raw.getvalue(), expected,
2155 "failed result for i=%d, j=%d" % (i, j))
2156
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002157 def test_truncate_after_read_or_write(self):
2158 raw = self.BytesIO(b"A" * 10)
2159 bufio = self.tp(raw, 100)
2160 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2161 self.assertEqual(bufio.truncate(), 2)
2162 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2163 self.assertEqual(bufio.truncate(), 4)
2164
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002165 def test_misbehaved_io(self):
2166 BufferedReaderTest.test_misbehaved_io(self)
2167 BufferedWriterTest.test_misbehaved_io(self)
2168
Antoine Pitroue05565e2011-08-20 14:39:23 +02002169 def test_interleaved_read_write(self):
2170 # Test for issue #12213
2171 with self.BytesIO(b'abcdefgh') as raw:
2172 with self.tp(raw, 100) as f:
2173 f.write(b"1")
2174 self.assertEqual(f.read(1), b'b')
2175 f.write(b'2')
2176 self.assertEqual(f.read1(1), b'd')
2177 f.write(b'3')
2178 buf = bytearray(1)
2179 f.readinto(buf)
2180 self.assertEqual(buf, b'f')
2181 f.write(b'4')
2182 self.assertEqual(f.peek(1), b'h')
2183 f.flush()
2184 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2185
2186 with self.BytesIO(b'abc') as raw:
2187 with self.tp(raw, 100) as f:
2188 self.assertEqual(f.read(1), b'a')
2189 f.write(b"2")
2190 self.assertEqual(f.read(1), b'c')
2191 f.flush()
2192 self.assertEqual(raw.getvalue(), b'a2c')
2193
2194 def test_interleaved_readline_write(self):
2195 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2196 with self.tp(raw) as f:
2197 f.write(b'1')
2198 self.assertEqual(f.readline(), b'b\n')
2199 f.write(b'2')
2200 self.assertEqual(f.readline(), b'def\n')
2201 f.write(b'3')
2202 self.assertEqual(f.readline(), b'\n')
2203 f.flush()
2204 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2205
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002206 # You can't construct a BufferedRandom over a non-seekable stream.
2207 test_unseekable = None
2208
R David Murray67bfe802013-02-23 21:51:05 -05002209
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002210class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002211 tp = io.BufferedRandom
2212
2213 def test_constructor(self):
2214 BufferedRandomTest.test_constructor(self)
2215 # The allocation can succeed on 32-bit builds, e.g. with more
2216 # than 2GB RAM and a 64-bit kernel.
2217 if sys.maxsize > 0x7FFFFFFF:
2218 rawio = self.MockRawIO()
2219 bufio = self.tp(rawio)
2220 self.assertRaises((OverflowError, MemoryError, ValueError),
2221 bufio.__init__, rawio, sys.maxsize)
2222
2223 def test_garbage_collection(self):
2224 CBufferedReaderTest.test_garbage_collection(self)
2225 CBufferedWriterTest.test_garbage_collection(self)
2226
R David Murray67bfe802013-02-23 21:51:05 -05002227 def test_args_error(self):
2228 # Issue #17275
2229 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2230 self.tp(io.BytesIO(), 1024, 1024, 1024)
2231
2232
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002233class PyBufferedRandomTest(BufferedRandomTest):
2234 tp = pyio.BufferedRandom
2235
2236
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002237# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2238# properties:
2239# - A single output character can correspond to many bytes of input.
2240# - The number of input bytes to complete the character can be
2241# undetermined until the last input byte is received.
2242# - The number of input bytes can vary depending on previous input.
2243# - A single input byte can correspond to many characters of output.
2244# - The number of output characters can be undetermined until the
2245# last input byte is received.
2246# - The number of output characters can vary depending on previous input.
2247
2248class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2249 """
2250 For testing seek/tell behavior with a stateful, buffering decoder.
2251
2252 Input is a sequence of words. Words may be fixed-length (length set
2253 by input) or variable-length (period-terminated). In variable-length
2254 mode, extra periods are ignored. Possible words are:
2255 - 'i' followed by a number sets the input length, I (maximum 99).
2256 When I is set to 0, words are space-terminated.
2257 - 'o' followed by a number sets the output length, O (maximum 99).
2258 - Any other word is converted into a word followed by a period on
2259 the output. The output word consists of the input word truncated
2260 or padded out with hyphens to make its length equal to O. If O
2261 is 0, the word is output verbatim without truncating or padding.
2262 I and O are initially set to 1. When I changes, any buffered input is
2263 re-scanned according to the new I. EOF also terminates the last word.
2264 """
2265
2266 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002267 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002268 self.reset()
2269
2270 def __repr__(self):
2271 return '<SID %x>' % id(self)
2272
2273 def reset(self):
2274 self.i = 1
2275 self.o = 1
2276 self.buffer = bytearray()
2277
2278 def getstate(self):
2279 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2280 return bytes(self.buffer), i*100 + o
2281
2282 def setstate(self, state):
2283 buffer, io = state
2284 self.buffer = bytearray(buffer)
2285 i, o = divmod(io, 100)
2286 self.i, self.o = i ^ 1, o ^ 1
2287
2288 def decode(self, input, final=False):
2289 output = ''
2290 for b in input:
2291 if self.i == 0: # variable-length, terminated with period
2292 if b == ord('.'):
2293 if self.buffer:
2294 output += self.process_word()
2295 else:
2296 self.buffer.append(b)
2297 else: # fixed-length, terminate after self.i bytes
2298 self.buffer.append(b)
2299 if len(self.buffer) == self.i:
2300 output += self.process_word()
2301 if final and self.buffer: # EOF terminates the last word
2302 output += self.process_word()
2303 return output
2304
2305 def process_word(self):
2306 output = ''
2307 if self.buffer[0] == ord('i'):
2308 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2309 elif self.buffer[0] == ord('o'):
2310 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2311 else:
2312 output = self.buffer.decode('ascii')
2313 if len(output) < self.o:
2314 output += '-'*self.o # pad out with hyphens
2315 if self.o:
2316 output = output[:self.o] # truncate to output length
2317 output += '.'
2318 self.buffer = bytearray()
2319 return output
2320
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002321 codecEnabled = False
2322
2323 @classmethod
2324 def lookupTestDecoder(cls, name):
2325 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002326 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002327 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002328 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002329 incrementalencoder=None,
2330 streamreader=None, streamwriter=None,
2331 incrementaldecoder=cls)
2332
2333# Register the previous decoder for testing.
2334# Disabled by default, tests will enable it.
2335codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2336
2337
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002338class StatefulIncrementalDecoderTest(unittest.TestCase):
2339 """
2340 Make sure the StatefulIncrementalDecoder actually works.
2341 """
2342
2343 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002344 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002345 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002346 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002347 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002348 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002349 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002350 # I=0, O=6 (variable-length input, fixed-length output)
2351 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2352 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002353 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002354 # I=6, O=3 (fixed-length input > fixed-length output)
2355 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2356 # I=0, then 3; O=29, then 15 (with longer output)
2357 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2358 'a----------------------------.' +
2359 'b----------------------------.' +
2360 'cde--------------------------.' +
2361 'abcdefghijabcde.' +
2362 'a.b------------.' +
2363 '.c.------------.' +
2364 'd.e------------.' +
2365 'k--------------.' +
2366 'l--------------.' +
2367 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002368 ]
2369
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002370 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002371 # Try a few one-shot test cases.
2372 for input, eof, output in self.test_cases:
2373 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002374 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002375
2376 # Also test an unfinished decode, followed by forcing EOF.
2377 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002378 self.assertEqual(d.decode(b'oiabcd'), '')
2379 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002380
2381class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002382
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002383 def setUp(self):
2384 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2385 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002386 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002387
Guido van Rossumd0712812007-04-11 16:32:43 +00002388 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002389 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002390
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002391 def test_constructor(self):
2392 r = self.BytesIO(b"\xc3\xa9\n\n")
2393 b = self.BufferedReader(r, 1000)
2394 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002395 t.__init__(b, encoding="latin-1", newline="\r\n")
2396 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002397 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002398 t.__init__(b, encoding="utf-8", line_buffering=True)
2399 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002400 self.assertEqual(t.line_buffering, True)
2401 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002402 self.assertRaises(TypeError, t.__init__, b, newline=42)
2403 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2404
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002405 def test_uninitialized(self):
2406 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2407 del t
2408 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2409 self.assertRaises(Exception, repr, t)
2410 self.assertRaisesRegex((ValueError, AttributeError),
2411 'uninitialized|has no attribute',
2412 t.read, 0)
2413 t.__init__(self.MockRawIO())
2414 self.assertEqual(t.read(0), '')
2415
Nick Coghlana9b15242014-02-04 22:11:18 +10002416 def test_non_text_encoding_codecs_are_rejected(self):
2417 # Ensure the constructor complains if passed a codec that isn't
2418 # marked as a text encoding
2419 # http://bugs.python.org/issue20404
2420 r = self.BytesIO()
2421 b = self.BufferedWriter(r)
2422 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2423 self.TextIOWrapper(b, encoding="hex")
2424
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002425 def test_detach(self):
2426 r = self.BytesIO()
2427 b = self.BufferedWriter(r)
2428 t = self.TextIOWrapper(b)
2429 self.assertIs(t.detach(), b)
2430
2431 t = self.TextIOWrapper(b, encoding="ascii")
2432 t.write("howdy")
2433 self.assertFalse(r.getvalue())
2434 t.detach()
2435 self.assertEqual(r.getvalue(), b"howdy")
2436 self.assertRaises(ValueError, t.detach)
2437
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002438 # Operations independent of the detached stream should still work
2439 repr(t)
2440 self.assertEqual(t.encoding, "ascii")
2441 self.assertEqual(t.errors, "strict")
2442 self.assertFalse(t.line_buffering)
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002443 self.assertFalse(t.write_through)
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002444
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002445 def test_repr(self):
2446 raw = self.BytesIO("hello".encode("utf-8"))
2447 b = self.BufferedReader(raw)
2448 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002449 modname = self.TextIOWrapper.__module__
2450 self.assertEqual(repr(t),
2451 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2452 raw.name = "dummy"
2453 self.assertEqual(repr(t),
2454 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002455 t.mode = "r"
2456 self.assertEqual(repr(t),
2457 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002458 raw.name = b"dummy"
2459 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002460 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002461
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002462 t.buffer.detach()
2463 repr(t) # Should not raise an exception
2464
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002465 def test_recursive_repr(self):
2466 # Issue #25455
2467 raw = self.BytesIO()
2468 t = self.TextIOWrapper(raw)
2469 with support.swap_attr(raw, 'name', t):
2470 try:
2471 repr(t) # Should not crash
2472 except RuntimeError:
2473 pass
2474
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002475 def test_line_buffering(self):
2476 r = self.BytesIO()
2477 b = self.BufferedWriter(r, 1000)
2478 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002479 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002480 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002481 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002482 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002483 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002484 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002485
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002486 def test_reconfigure_line_buffering(self):
2487 r = self.BytesIO()
2488 b = self.BufferedWriter(r, 1000)
2489 t = self.TextIOWrapper(b, newline="\n", line_buffering=False)
2490 t.write("AB\nC")
2491 self.assertEqual(r.getvalue(), b"")
2492
2493 t.reconfigure(line_buffering=True) # implicit flush
2494 self.assertEqual(r.getvalue(), b"AB\nC")
2495 t.write("DEF\nG")
2496 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2497 t.write("H")
2498 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2499 t.reconfigure(line_buffering=False) # implicit flush
2500 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2501 t.write("IJ")
2502 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2503
2504 # Keeping default value
2505 t.reconfigure()
2506 t.reconfigure(line_buffering=None)
2507 self.assertEqual(t.line_buffering, False)
2508 t.reconfigure(line_buffering=True)
2509 t.reconfigure()
2510 t.reconfigure(line_buffering=None)
2511 self.assertEqual(t.line_buffering, True)
2512
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002513 def test_default_encoding(self):
2514 old_environ = dict(os.environ)
2515 try:
2516 # try to get a user preferred encoding different than the current
2517 # locale encoding to check that TextIOWrapper() uses the current
2518 # locale encoding and not the user preferred encoding
2519 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2520 if key in os.environ:
2521 del os.environ[key]
2522
2523 current_locale_encoding = locale.getpreferredencoding(False)
2524 b = self.BytesIO()
2525 t = self.TextIOWrapper(b)
2526 self.assertEqual(t.encoding, current_locale_encoding)
2527 finally:
2528 os.environ.clear()
2529 os.environ.update(old_environ)
2530
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002531 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002532 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002533 # Issue 15989
2534 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002535 b = self.BytesIO()
2536 b.fileno = lambda: _testcapi.INT_MAX + 1
2537 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2538 b.fileno = lambda: _testcapi.UINT_MAX + 1
2539 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2540
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002541 def test_encoding(self):
2542 # Check the encoding attribute is always set, and valid
2543 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002544 t = self.TextIOWrapper(b, encoding="utf-8")
2545 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002546 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002547 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002548 codecs.lookup(t.encoding)
2549
2550 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002551 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002552 b = self.BytesIO(b"abc\n\xff\n")
2553 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002554 self.assertRaises(UnicodeError, t.read)
2555 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002556 b = self.BytesIO(b"abc\n\xff\n")
2557 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002558 self.assertRaises(UnicodeError, t.read)
2559 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002560 b = self.BytesIO(b"abc\n\xff\n")
2561 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002562 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002563 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002564 b = self.BytesIO(b"abc\n\xff\n")
2565 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002566 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002567
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002568 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002569 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002570 b = self.BytesIO()
2571 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002572 self.assertRaises(UnicodeError, t.write, "\xff")
2573 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002574 b = self.BytesIO()
2575 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002576 self.assertRaises(UnicodeError, t.write, "\xff")
2577 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002578 b = self.BytesIO()
2579 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002580 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002581 t.write("abc\xffdef\n")
2582 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002583 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002584 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002585 b = self.BytesIO()
2586 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002587 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002588 t.write("abc\xffdef\n")
2589 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002590 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002591
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002592 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002593 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2594
2595 tests = [
2596 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002597 [ '', input_lines ],
2598 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2599 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2600 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002601 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002602 encodings = (
2603 'utf-8', 'latin-1',
2604 'utf-16', 'utf-16-le', 'utf-16-be',
2605 'utf-32', 'utf-32-le', 'utf-32-be',
2606 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002607
Guido van Rossum8358db22007-08-18 21:39:55 +00002608 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002609 # character in TextIOWrapper._pending_line.
2610 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002611 # XXX: str.encode() should return bytes
2612 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002613 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002614 for bufsize in range(1, 10):
2615 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002616 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2617 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002618 encoding=encoding)
2619 if do_reads:
2620 got_lines = []
2621 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002622 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002623 if c2 == '':
2624 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002625 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002626 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002627 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002628 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002629
2630 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002631 self.assertEqual(got_line, exp_line)
2632 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002633
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002634 def test_newlines_input(self):
2635 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002636 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2637 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002638 (None, normalized.decode("ascii").splitlines(keepends=True)),
2639 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002640 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2641 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2642 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002643 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002644 buf = self.BytesIO(testdata)
2645 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002646 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002647 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002648 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002649
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002650 def test_newlines_output(self):
2651 testdict = {
2652 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2653 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2654 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2655 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2656 }
2657 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2658 for newline, expected in tests:
2659 buf = self.BytesIO()
2660 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2661 txt.write("AAA\nB")
2662 txt.write("BB\nCCC\n")
2663 txt.write("X\rY\r\nZ")
2664 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002665 self.assertEqual(buf.closed, False)
2666 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002667
2668 def test_destructor(self):
2669 l = []
2670 base = self.BytesIO
2671 class MyBytesIO(base):
2672 def close(self):
2673 l.append(self.getvalue())
2674 base.close(self)
2675 b = MyBytesIO()
2676 t = self.TextIOWrapper(b, encoding="ascii")
2677 t.write("abc")
2678 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002679 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002680 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002681
2682 def test_override_destructor(self):
2683 record = []
2684 class MyTextIO(self.TextIOWrapper):
2685 def __del__(self):
2686 record.append(1)
2687 try:
2688 f = super().__del__
2689 except AttributeError:
2690 pass
2691 else:
2692 f()
2693 def close(self):
2694 record.append(2)
2695 super().close()
2696 def flush(self):
2697 record.append(3)
2698 super().flush()
2699 b = self.BytesIO()
2700 t = MyTextIO(b, encoding="ascii")
2701 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002702 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002703 self.assertEqual(record, [1, 2, 3])
2704
2705 def test_error_through_destructor(self):
2706 # Test that the exception state is not modified by a destructor,
2707 # even if close() fails.
2708 rawio = self.CloseFailureIO()
2709 def f():
2710 self.TextIOWrapper(rawio).xyzzy
2711 with support.captured_output("stderr") as s:
2712 self.assertRaises(AttributeError, f)
2713 s = s.getvalue().strip()
2714 if s:
2715 # The destructor *may* have printed an unraisable error, check it
2716 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002717 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002718 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002719
Guido van Rossum9b76da62007-04-11 01:09:03 +00002720 # Systematic tests of the text I/O API
2721
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002722 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002723 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002724 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002725 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002726 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002727 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002728 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002729 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002730 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002731 self.assertEqual(f.tell(), 0)
2732 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002733 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002734 self.assertEqual(f.seek(0), 0)
2735 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002736 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002737 self.assertEqual(f.read(2), "ab")
2738 self.assertEqual(f.read(1), "c")
2739 self.assertEqual(f.read(1), "")
2740 self.assertEqual(f.read(), "")
2741 self.assertEqual(f.tell(), cookie)
2742 self.assertEqual(f.seek(0), 0)
2743 self.assertEqual(f.seek(0, 2), cookie)
2744 self.assertEqual(f.write("def"), 3)
2745 self.assertEqual(f.seek(cookie), cookie)
2746 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002747 if enc.startswith("utf"):
2748 self.multi_line_test(f, enc)
2749 f.close()
2750
2751 def multi_line_test(self, f, enc):
2752 f.seek(0)
2753 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002754 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002755 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002756 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002757 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002758 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002759 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002760 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002761 wlines.append((f.tell(), line))
2762 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002763 f.seek(0)
2764 rlines = []
2765 while True:
2766 pos = f.tell()
2767 line = f.readline()
2768 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002769 break
2770 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002771 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002772
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002773 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002774 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002775 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002776 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002777 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002778 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002779 p2 = f.tell()
2780 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002781 self.assertEqual(f.tell(), p0)
2782 self.assertEqual(f.readline(), "\xff\n")
2783 self.assertEqual(f.tell(), p1)
2784 self.assertEqual(f.readline(), "\xff\n")
2785 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002786 f.seek(0)
2787 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002788 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002789 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002790 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002791 f.close()
2792
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002793 def test_seeking(self):
2794 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002795 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002796 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002797 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002798 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002799 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002800 suffix = bytes(u_suffix.encode("utf-8"))
2801 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002802 with self.open(support.TESTFN, "wb") as f:
2803 f.write(line*2)
2804 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2805 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002806 self.assertEqual(s, str(prefix, "ascii"))
2807 self.assertEqual(f.tell(), prefix_size)
2808 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002809
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002810 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002811 # Regression test for a specific bug
2812 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002813 with self.open(support.TESTFN, "wb") as f:
2814 f.write(data)
2815 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2816 f._CHUNK_SIZE # Just test that it exists
2817 f._CHUNK_SIZE = 2
2818 f.readline()
2819 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002820
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002821 def test_seek_and_tell(self):
2822 #Test seek/tell using the StatefulIncrementalDecoder.
2823 # Make test faster by doing smaller seeks
2824 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002825
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002826 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002827 """Tell/seek to various points within a data stream and ensure
2828 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002829 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002830 f.write(data)
2831 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002832 f = self.open(support.TESTFN, encoding='test_decoder')
2833 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002834 decoded = f.read()
2835 f.close()
2836
Neal Norwitze2b07052008-03-18 19:52:05 +00002837 for i in range(min_pos, len(decoded) + 1): # seek positions
2838 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002839 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002840 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002841 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002842 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002843 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002844 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002845 f.close()
2846
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002847 # Enable the test decoder.
2848 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002849
2850 # Run the tests.
2851 try:
2852 # Try each test case.
2853 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002854 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002855
2856 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002857 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2858 offset = CHUNK_SIZE - len(input)//2
2859 prefix = b'.'*offset
2860 # Don't bother seeking into the prefix (takes too long).
2861 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002862 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002863
2864 # Ensure our test decoder won't interfere with subsequent tests.
2865 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002866 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002867
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002868 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002869 data = "1234567890"
2870 tests = ("utf-16",
2871 "utf-16-le",
2872 "utf-16-be",
2873 "utf-32",
2874 "utf-32-le",
2875 "utf-32-be")
2876 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002877 buf = self.BytesIO()
2878 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002879 # Check if the BOM is written only once (see issue1753).
2880 f.write(data)
2881 f.write(data)
2882 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002883 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002884 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002885 self.assertEqual(f.read(), data * 2)
2886 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002887
Benjamin Petersona1b49012009-03-31 23:11:32 +00002888 def test_unreadable(self):
2889 class UnReadable(self.BytesIO):
2890 def readable(self):
2891 return False
2892 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002893 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002894
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002895 def test_read_one_by_one(self):
2896 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002897 reads = ""
2898 while True:
2899 c = txt.read(1)
2900 if not c:
2901 break
2902 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002903 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002904
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002905 def test_readlines(self):
2906 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2907 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2908 txt.seek(0)
2909 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2910 txt.seek(0)
2911 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2912
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002913 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002914 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002915 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002916 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002917 reads = ""
2918 while True:
2919 c = txt.read(128)
2920 if not c:
2921 break
2922 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002923 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002924
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002925 def test_writelines(self):
2926 l = ['ab', 'cd', 'ef']
2927 buf = self.BytesIO()
2928 txt = self.TextIOWrapper(buf)
2929 txt.writelines(l)
2930 txt.flush()
2931 self.assertEqual(buf.getvalue(), b'abcdef')
2932
2933 def test_writelines_userlist(self):
2934 l = UserList(['ab', 'cd', 'ef'])
2935 buf = self.BytesIO()
2936 txt = self.TextIOWrapper(buf)
2937 txt.writelines(l)
2938 txt.flush()
2939 self.assertEqual(buf.getvalue(), b'abcdef')
2940
2941 def test_writelines_error(self):
2942 txt = self.TextIOWrapper(self.BytesIO())
2943 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2944 self.assertRaises(TypeError, txt.writelines, None)
2945 self.assertRaises(TypeError, txt.writelines, b'abc')
2946
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002947 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002948 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002949
2950 # read one char at a time
2951 reads = ""
2952 while True:
2953 c = txt.read(1)
2954 if not c:
2955 break
2956 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002957 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002958
2959 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002960 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002961 txt._CHUNK_SIZE = 4
2962
2963 reads = ""
2964 while True:
2965 c = txt.read(4)
2966 if not c:
2967 break
2968 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002969 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002970
2971 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002972 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002973 txt._CHUNK_SIZE = 4
2974
2975 reads = txt.read(4)
2976 reads += txt.read(4)
2977 reads += txt.readline()
2978 reads += txt.readline()
2979 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002980 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002981
2982 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002983 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002984 txt._CHUNK_SIZE = 4
2985
2986 reads = txt.read(4)
2987 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002988 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002989
2990 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002991 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002992 txt._CHUNK_SIZE = 4
2993
2994 reads = txt.read(4)
2995 pos = txt.tell()
2996 txt.seek(0)
2997 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002998 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002999
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003000 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003001 buffer = self.BytesIO(self.testdata)
3002 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003003
3004 self.assertEqual(buffer.seekable(), txt.seekable())
3005
Antoine Pitroue4501852009-05-14 18:55:55 +00003006 def test_append_bom(self):
3007 # The BOM is not written again when appending to a non-empty file
3008 filename = support.TESTFN
3009 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3010 with self.open(filename, 'w', encoding=charset) as f:
3011 f.write('aaa')
3012 pos = f.tell()
3013 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003014 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003015
3016 with self.open(filename, 'a', encoding=charset) as f:
3017 f.write('xxx')
3018 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003019 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003020
3021 def test_seek_bom(self):
3022 # Same test, but when seeking manually
3023 filename = support.TESTFN
3024 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3025 with self.open(filename, 'w', encoding=charset) as f:
3026 f.write('aaa')
3027 pos = f.tell()
3028 with self.open(filename, 'r+', encoding=charset) as f:
3029 f.seek(pos)
3030 f.write('zzz')
3031 f.seek(0)
3032 f.write('bbb')
3033 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003034 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003035
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003036 def test_seek_append_bom(self):
3037 # Same test, but first seek to the start and then to the end
3038 filename = support.TESTFN
3039 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3040 with self.open(filename, 'w', encoding=charset) as f:
3041 f.write('aaa')
3042 with self.open(filename, 'a', encoding=charset) as f:
3043 f.seek(0)
3044 f.seek(0, self.SEEK_END)
3045 f.write('xxx')
3046 with self.open(filename, 'rb') as f:
3047 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3048
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003049 def test_errors_property(self):
3050 with self.open(support.TESTFN, "w") as f:
3051 self.assertEqual(f.errors, "strict")
3052 with self.open(support.TESTFN, "w", errors="replace") as f:
3053 self.assertEqual(f.errors, "replace")
3054
Brett Cannon31f59292011-02-21 19:29:56 +00003055 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00003056 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003057 def test_threads_write(self):
3058 # Issue6750: concurrent writes could duplicate data
3059 event = threading.Event()
3060 with self.open(support.TESTFN, "w", buffering=1) as f:
3061 def run(n):
3062 text = "Thread%03d\n" % n
3063 event.wait()
3064 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003065 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003066 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003067 with support.start_threads(threads, event.set):
3068 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003069 with self.open(support.TESTFN) as f:
3070 content = f.read()
3071 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003072 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003073
Antoine Pitrou6be88762010-05-03 16:48:20 +00003074 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003075 # Test that text file is closed despite failed flush
3076 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003077 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003078 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003079 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003080 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003081 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003082 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003083 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003084 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003085 self.assertTrue(txt.buffer.closed)
3086 self.assertTrue(closed) # flush() called
3087 self.assertFalse(closed[0]) # flush() called before file closed
3088 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003089 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003090
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003091 def test_close_error_on_close(self):
3092 buffer = self.BytesIO(self.testdata)
3093 def bad_flush():
3094 raise OSError('flush')
3095 def bad_close():
3096 raise OSError('close')
3097 buffer.close = bad_close
3098 txt = self.TextIOWrapper(buffer, encoding="ascii")
3099 txt.flush = bad_flush
3100 with self.assertRaises(OSError) as err: # exception not swallowed
3101 txt.close()
3102 self.assertEqual(err.exception.args, ('close',))
3103 self.assertIsInstance(err.exception.__context__, OSError)
3104 self.assertEqual(err.exception.__context__.args, ('flush',))
3105 self.assertFalse(txt.closed)
3106
3107 def test_nonnormalized_close_error_on_close(self):
3108 # Issue #21677
3109 buffer = self.BytesIO(self.testdata)
3110 def bad_flush():
3111 raise non_existing_flush
3112 def bad_close():
3113 raise non_existing_close
3114 buffer.close = bad_close
3115 txt = self.TextIOWrapper(buffer, encoding="ascii")
3116 txt.flush = bad_flush
3117 with self.assertRaises(NameError) as err: # exception not swallowed
3118 txt.close()
3119 self.assertIn('non_existing_close', str(err.exception))
3120 self.assertIsInstance(err.exception.__context__, NameError)
3121 self.assertIn('non_existing_flush', str(err.exception.__context__))
3122 self.assertFalse(txt.closed)
3123
Antoine Pitrou6be88762010-05-03 16:48:20 +00003124 def test_multi_close(self):
3125 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3126 txt.close()
3127 txt.close()
3128 txt.close()
3129 self.assertRaises(ValueError, txt.flush)
3130
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003131 def test_unseekable(self):
3132 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3133 self.assertRaises(self.UnsupportedOperation, txt.tell)
3134 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3135
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003136 def test_readonly_attributes(self):
3137 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3138 buf = self.BytesIO(self.testdata)
3139 with self.assertRaises(AttributeError):
3140 txt.buffer = buf
3141
Antoine Pitroue96ec682011-07-23 21:46:35 +02003142 def test_rawio(self):
3143 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3144 # that subprocess.Popen() can have the required unbuffered
3145 # semantics with universal_newlines=True.
3146 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3147 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3148 # Reads
3149 self.assertEqual(txt.read(4), 'abcd')
3150 self.assertEqual(txt.readline(), 'efghi\n')
3151 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3152
3153 def test_rawio_write_through(self):
3154 # Issue #12591: with write_through=True, writes don't need a flush
3155 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3156 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3157 write_through=True)
3158 txt.write('1')
3159 txt.write('23\n4')
3160 txt.write('5')
3161 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3162
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003163 def test_bufio_write_through(self):
3164 # Issue #21396: write_through=True doesn't force a flush()
3165 # on the underlying binary buffered object.
3166 flush_called, write_called = [], []
3167 class BufferedWriter(self.BufferedWriter):
3168 def flush(self, *args, **kwargs):
3169 flush_called.append(True)
3170 return super().flush(*args, **kwargs)
3171 def write(self, *args, **kwargs):
3172 write_called.append(True)
3173 return super().write(*args, **kwargs)
3174
3175 rawio = self.BytesIO()
3176 data = b"a"
3177 bufio = BufferedWriter(rawio, len(data)*2)
3178 textio = self.TextIOWrapper(bufio, encoding='ascii',
3179 write_through=True)
3180 # write to the buffered io but don't overflow the buffer
3181 text = data.decode('ascii')
3182 textio.write(text)
3183
3184 # buffer.flush is not called with write_through=True
3185 self.assertFalse(flush_called)
3186 # buffer.write *is* called with write_through=True
3187 self.assertTrue(write_called)
3188 self.assertEqual(rawio.getvalue(), b"") # no flush
3189
3190 write_called = [] # reset
3191 textio.write(text * 10) # total content is larger than bufio buffer
3192 self.assertTrue(write_called)
3193 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3194
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02003195 def test_reconfigure_write_through(self):
3196 raw = self.MockRawIO([])
3197 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3198 t.write('1')
3199 t.reconfigure(write_through=True) # implied flush
3200 self.assertEqual(t.write_through, True)
3201 self.assertEqual(b''.join(raw._write_stack), b'1')
3202 t.write('23')
3203 self.assertEqual(b''.join(raw._write_stack), b'123')
3204 t.reconfigure(write_through=False)
3205 self.assertEqual(t.write_through, False)
3206 t.write('45')
3207 t.flush()
3208 self.assertEqual(b''.join(raw._write_stack), b'12345')
3209 # Keeping default value
3210 t.reconfigure()
3211 t.reconfigure(write_through=None)
3212 self.assertEqual(t.write_through, False)
3213 t.reconfigure(write_through=True)
3214 t.reconfigure()
3215 t.reconfigure(write_through=None)
3216 self.assertEqual(t.write_through, True)
3217
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003218 def test_read_nonbytes(self):
3219 # Issue #17106
3220 # Crash when underlying read() returns non-bytes
3221 t = self.TextIOWrapper(self.StringIO('a'))
3222 self.assertRaises(TypeError, t.read, 1)
3223 t = self.TextIOWrapper(self.StringIO('a'))
3224 self.assertRaises(TypeError, t.readline)
3225 t = self.TextIOWrapper(self.StringIO('a'))
3226 self.assertRaises(TypeError, t.read)
3227
Oren Milmana5b4ea12017-08-25 21:14:54 +03003228 def test_illegal_encoder(self):
3229 # Issue 31271: Calling write() while the return value of encoder's
3230 # encode() is invalid shouldn't cause an assertion failure.
3231 rot13 = codecs.lookup("rot13")
3232 with support.swap_attr(rot13, '_is_text_encoding', True):
3233 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3234 self.assertRaises(TypeError, t.write, 'bar')
3235
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003236 def test_illegal_decoder(self):
3237 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003238 # Bypass the early encoding check added in issue 20404
3239 def _make_illegal_wrapper():
3240 quopri = codecs.lookup("quopri")
3241 quopri._is_text_encoding = True
3242 try:
3243 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3244 newline='\n', encoding="quopri")
3245 finally:
3246 quopri._is_text_encoding = False
3247 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003248 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003249 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003250 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003251 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003252 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003253 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003254 self.assertRaises(TypeError, t.read)
3255
Oren Milmanba7d7362017-08-29 11:58:27 +03003256 # Issue 31243: calling read() while the return value of decoder's
3257 # getstate() is invalid should neither crash the interpreter nor
3258 # raise a SystemError.
3259 def _make_very_illegal_wrapper(getstate_ret_val):
3260 class BadDecoder:
3261 def getstate(self):
3262 return getstate_ret_val
3263 def _get_bad_decoder(dummy):
3264 return BadDecoder()
3265 quopri = codecs.lookup("quopri")
3266 with support.swap_attr(quopri, 'incrementaldecoder',
3267 _get_bad_decoder):
3268 return _make_illegal_wrapper()
3269 t = _make_very_illegal_wrapper(42)
3270 self.assertRaises(TypeError, t.read, 42)
3271 t = _make_very_illegal_wrapper(())
3272 self.assertRaises(TypeError, t.read, 42)
3273 t = _make_very_illegal_wrapper((1, 2))
3274 self.assertRaises(TypeError, t.read, 42)
3275
Antoine Pitrou712cb732013-12-21 15:51:54 +01003276 def _check_create_at_shutdown(self, **kwargs):
3277 # Issue #20037: creating a TextIOWrapper at shutdown
3278 # shouldn't crash the interpreter.
3279 iomod = self.io.__name__
3280 code = """if 1:
3281 import codecs
3282 import {iomod} as io
3283
3284 # Avoid looking up codecs at shutdown
3285 codecs.lookup('utf-8')
3286
3287 class C:
3288 def __init__(self):
3289 self.buf = io.BytesIO()
3290 def __del__(self):
3291 io.TextIOWrapper(self.buf, **{kwargs})
3292 print("ok")
3293 c = C()
3294 """.format(iomod=iomod, kwargs=kwargs)
3295 return assert_python_ok("-c", code)
3296
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003297 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003298 def test_create_at_shutdown_without_encoding(self):
3299 rc, out, err = self._check_create_at_shutdown()
3300 if err:
3301 # Can error out with a RuntimeError if the module state
3302 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003303 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003304 else:
3305 self.assertEqual("ok", out.decode().strip())
3306
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003307 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003308 def test_create_at_shutdown_with_encoding(self):
3309 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3310 errors='strict')
3311 self.assertFalse(err)
3312 self.assertEqual("ok", out.decode().strip())
3313
Antoine Pitroub8503892014-04-29 10:14:02 +02003314 def test_read_byteslike(self):
3315 r = MemviewBytesIO(b'Just some random string\n')
3316 t = self.TextIOWrapper(r, 'utf-8')
3317
3318 # TextIOwrapper will not read the full string, because
3319 # we truncate it to a multiple of the native int size
3320 # so that we can construct a more complex memoryview.
3321 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3322
3323 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3324
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003325 def test_issue22849(self):
3326 class F(object):
3327 def readable(self): return True
3328 def writable(self): return True
3329 def seekable(self): return True
3330
3331 for i in range(10):
3332 try:
3333 self.TextIOWrapper(F(), encoding='utf-8')
3334 except Exception:
3335 pass
3336
3337 F.tell = lambda x: 0
3338 t = self.TextIOWrapper(F(), encoding='utf-8')
3339
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003340
Antoine Pitroub8503892014-04-29 10:14:02 +02003341class MemviewBytesIO(io.BytesIO):
3342 '''A BytesIO object whose read method returns memoryviews
3343 rather than bytes'''
3344
3345 def read1(self, len_):
3346 return _to_memoryview(super().read1(len_))
3347
3348 def read(self, len_):
3349 return _to_memoryview(super().read(len_))
3350
3351def _to_memoryview(buf):
3352 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3353
3354 arr = array.array('i')
3355 idx = len(buf) - len(buf) % arr.itemsize
3356 arr.frombytes(buf[:idx])
3357 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003358
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003359
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003360class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003361 io = io
Steve Dowerfe8f4c92016-09-07 09:31:52 -07003362 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003363
3364 def test_initialization(self):
3365 r = self.BytesIO(b"\xc3\xa9\n\n")
3366 b = self.BufferedReader(r, 1000)
3367 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003368 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3369 self.assertRaises(ValueError, t.read)
3370
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003371 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3372 self.assertRaises(Exception, repr, t)
3373
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003374 def test_garbage_collection(self):
3375 # C TextIOWrapper objects are collected, and collecting them flushes
3376 # all data to disk.
3377 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003378 with support.check_warnings(('', ResourceWarning)):
3379 rawio = io.FileIO(support.TESTFN, "wb")
3380 b = self.BufferedWriter(rawio)
3381 t = self.TextIOWrapper(b, encoding="ascii")
3382 t.write("456def")
3383 t.x = t
3384 wr = weakref.ref(t)
3385 del t
3386 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003387 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003388 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003389 self.assertEqual(f.read(), b"456def")
3390
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003391 def test_rwpair_cleared_before_textio(self):
3392 # Issue 13070: TextIOWrapper's finalization would crash when called
3393 # after the reference to the underlying BufferedRWPair's writer got
3394 # cleared by the GC.
3395 for i in range(1000):
3396 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3397 t1 = self.TextIOWrapper(b1, encoding="ascii")
3398 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3399 t2 = self.TextIOWrapper(b2, encoding="ascii")
3400 # circular references
3401 t1.buddy = t2
3402 t2.buddy = t1
3403 support.gc_collect()
3404
3405
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003406class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003407 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003408 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003409
3410
3411class IncrementalNewlineDecoderTest(unittest.TestCase):
3412
3413 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003414 # UTF-8 specific tests for a newline decoder
3415 def _check_decode(b, s, **kwargs):
3416 # We exercise getstate() / setstate() as well as decode()
3417 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003418 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003419 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003420 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003421
Antoine Pitrou180a3362008-12-14 16:36:46 +00003422 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003423
Antoine Pitrou180a3362008-12-14 16:36:46 +00003424 _check_decode(b'\xe8', "")
3425 _check_decode(b'\xa2', "")
3426 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003427
Antoine Pitrou180a3362008-12-14 16:36:46 +00003428 _check_decode(b'\xe8', "")
3429 _check_decode(b'\xa2', "")
3430 _check_decode(b'\x88', "\u8888")
3431
3432 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003433 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3434
Antoine Pitrou180a3362008-12-14 16:36:46 +00003435 decoder.reset()
3436 _check_decode(b'\n', "\n")
3437 _check_decode(b'\r', "")
3438 _check_decode(b'', "\n", final=True)
3439 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003440
Antoine Pitrou180a3362008-12-14 16:36:46 +00003441 _check_decode(b'\r', "")
3442 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003443
Antoine Pitrou180a3362008-12-14 16:36:46 +00003444 _check_decode(b'\r\r\n', "\n\n")
3445 _check_decode(b'\r', "")
3446 _check_decode(b'\r', "\n")
3447 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003448
Antoine Pitrou180a3362008-12-14 16:36:46 +00003449 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3450 _check_decode(b'\xe8\xa2\x88', "\u8888")
3451 _check_decode(b'\n', "\n")
3452 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3453 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003454
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003455 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003456 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003457 if encoding is not None:
3458 encoder = codecs.getincrementalencoder(encoding)()
3459 def _decode_bytewise(s):
3460 # Decode one byte at a time
3461 for b in encoder.encode(s):
3462 result.append(decoder.decode(bytes([b])))
3463 else:
3464 encoder = None
3465 def _decode_bytewise(s):
3466 # Decode one char at a time
3467 for c in s:
3468 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003469 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003470 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003471 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003472 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003473 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003474 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003475 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003476 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003477 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003478 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003479 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003480 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003481 input = "abc"
3482 if encoder is not None:
3483 encoder.reset()
3484 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003485 self.assertEqual(decoder.decode(input), "abc")
3486 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003487
3488 def test_newline_decoder(self):
3489 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003490 # None meaning the IncrementalNewlineDecoder takes unicode input
3491 # rather than bytes input
3492 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003493 'utf-16', 'utf-16-le', 'utf-16-be',
3494 'utf-32', 'utf-32-le', 'utf-32-be',
3495 )
3496 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003497 decoder = enc and codecs.getincrementaldecoder(enc)()
3498 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3499 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003500 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003501 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3502 self.check_newline_decoding_utf8(decoder)
Oren Milman1d1d3e92017-08-20 18:35:36 +03003503 self.assertRaises(TypeError, decoder.setstate, 42)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003504
Antoine Pitrou66913e22009-03-06 23:40:56 +00003505 def test_newline_bytes(self):
3506 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3507 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003508 self.assertEqual(dec.newlines, None)
3509 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3510 self.assertEqual(dec.newlines, None)
3511 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3512 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003513 dec = self.IncrementalNewlineDecoder(None, translate=False)
3514 _check(dec)
3515 dec = self.IncrementalNewlineDecoder(None, translate=True)
3516 _check(dec)
3517
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003518class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3519 pass
3520
3521class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3522 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003523
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003524
Guido van Rossum01a27522007-03-07 01:00:12 +00003525# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003526
Guido van Rossum5abbf752007-08-27 17:39:33 +00003527class MiscIOTest(unittest.TestCase):
3528
Barry Warsaw40e82462008-11-20 20:14:50 +00003529 def tearDown(self):
3530 support.unlink(support.TESTFN)
3531
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003532 def test___all__(self):
3533 for name in self.io.__all__:
3534 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003535 self.assertIsNotNone(obj, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003536 if name == "open":
3537 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003538 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003539 self.assertTrue(issubclass(obj, Exception), name)
3540 elif not name.startswith("SEEK_"):
3541 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003542
Barry Warsaw40e82462008-11-20 20:14:50 +00003543 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003544 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003545 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003546 f.close()
3547
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003548 with support.check_warnings(('', DeprecationWarning)):
3549 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003550 self.assertEqual(f.name, support.TESTFN)
3551 self.assertEqual(f.buffer.name, support.TESTFN)
3552 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3553 self.assertEqual(f.mode, "U")
3554 self.assertEqual(f.buffer.mode, "rb")
3555 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003556 f.close()
3557
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003558 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003559 self.assertEqual(f.mode, "w+")
3560 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3561 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003562
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003563 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003564 self.assertEqual(g.mode, "wb")
3565 self.assertEqual(g.raw.mode, "wb")
3566 self.assertEqual(g.name, f.fileno())
3567 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003568 f.close()
3569 g.close()
3570
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003571 def test_io_after_close(self):
3572 for kwargs in [
3573 {"mode": "w"},
3574 {"mode": "wb"},
3575 {"mode": "w", "buffering": 1},
3576 {"mode": "w", "buffering": 2},
3577 {"mode": "wb", "buffering": 0},
3578 {"mode": "r"},
3579 {"mode": "rb"},
3580 {"mode": "r", "buffering": 1},
3581 {"mode": "r", "buffering": 2},
3582 {"mode": "rb", "buffering": 0},
3583 {"mode": "w+"},
3584 {"mode": "w+b"},
3585 {"mode": "w+", "buffering": 1},
3586 {"mode": "w+", "buffering": 2},
3587 {"mode": "w+b", "buffering": 0},
3588 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003589 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003590 f.close()
3591 self.assertRaises(ValueError, f.flush)
3592 self.assertRaises(ValueError, f.fileno)
3593 self.assertRaises(ValueError, f.isatty)
3594 self.assertRaises(ValueError, f.__iter__)
3595 if hasattr(f, "peek"):
3596 self.assertRaises(ValueError, f.peek, 1)
3597 self.assertRaises(ValueError, f.read)
3598 if hasattr(f, "read1"):
3599 self.assertRaises(ValueError, f.read1, 1024)
Martin Panterccb2c0e2016-10-20 23:48:14 +00003600 self.assertRaises(ValueError, f.read1)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003601 if hasattr(f, "readall"):
3602 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003603 if hasattr(f, "readinto"):
3604 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003605 if hasattr(f, "readinto1"):
3606 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003607 self.assertRaises(ValueError, f.readline)
3608 self.assertRaises(ValueError, f.readlines)
Xiang Zhang026435c2017-04-15 12:47:28 +08003609 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003610 self.assertRaises(ValueError, f.seek, 0)
3611 self.assertRaises(ValueError, f.tell)
3612 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003613 self.assertRaises(ValueError, f.write,
3614 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003615 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003616 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003617
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003618 def test_blockingioerror(self):
3619 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003620 class C(str):
3621 pass
3622 c = C("")
3623 b = self.BlockingIOError(1, c)
3624 c.b = b
3625 b.c = c
3626 wr = weakref.ref(c)
3627 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003628 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003629 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003630
3631 def test_abcs(self):
3632 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003633 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3634 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3635 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3636 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003637
3638 def _check_abc_inheritance(self, abcmodule):
3639 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003640 self.assertIsInstance(f, abcmodule.IOBase)
3641 self.assertIsInstance(f, abcmodule.RawIOBase)
3642 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3643 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003644 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003645 self.assertIsInstance(f, abcmodule.IOBase)
3646 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3647 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3648 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003649 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003650 self.assertIsInstance(f, abcmodule.IOBase)
3651 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3652 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3653 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003654
3655 def test_abc_inheritance(self):
3656 # Test implementations inherit from their respective ABCs
3657 self._check_abc_inheritance(self)
3658
3659 def test_abc_inheritance_official(self):
3660 # Test implementations inherit from the official ABCs of the
3661 # baseline "io" module.
3662 self._check_abc_inheritance(io)
3663
Antoine Pitroue033e062010-10-29 10:38:18 +00003664 def _check_warn_on_dealloc(self, *args, **kwargs):
3665 f = open(*args, **kwargs)
3666 r = repr(f)
3667 with self.assertWarns(ResourceWarning) as cm:
3668 f = None
3669 support.gc_collect()
3670 self.assertIn(r, str(cm.warning.args[0]))
3671
3672 def test_warn_on_dealloc(self):
3673 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3674 self._check_warn_on_dealloc(support.TESTFN, "wb")
3675 self._check_warn_on_dealloc(support.TESTFN, "w")
3676
3677 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3678 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003679 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003680 for fd in fds:
3681 try:
3682 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003683 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003684 if e.errno != errno.EBADF:
3685 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003686 self.addCleanup(cleanup_fds)
3687 r, w = os.pipe()
3688 fds += r, w
3689 self._check_warn_on_dealloc(r, *args, **kwargs)
3690 # When using closefd=False, there's no warning
3691 r, w = os.pipe()
3692 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02003693 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00003694 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00003695
3696 def test_warn_on_dealloc_fd(self):
3697 self._check_warn_on_dealloc_fd("rb", buffering=0)
3698 self._check_warn_on_dealloc_fd("rb")
3699 self._check_warn_on_dealloc_fd("r")
3700
3701
Antoine Pitrou243757e2010-11-05 21:15:39 +00003702 def test_pickling(self):
3703 # Pickling file objects is forbidden
3704 for kwargs in [
3705 {"mode": "w"},
3706 {"mode": "wb"},
3707 {"mode": "wb", "buffering": 0},
3708 {"mode": "r"},
3709 {"mode": "rb"},
3710 {"mode": "rb", "buffering": 0},
3711 {"mode": "w+"},
3712 {"mode": "w+b"},
3713 {"mode": "w+b", "buffering": 0},
3714 ]:
3715 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3716 with self.open(support.TESTFN, **kwargs) as f:
3717 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3718
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003719 def test_nonblock_pipe_write_bigbuf(self):
3720 self._test_nonblock_pipe_write(16*1024)
3721
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003722 def test_nonblock_pipe_write_smallbuf(self):
3723 self._test_nonblock_pipe_write(1024)
3724
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003725 @unittest.skipUnless(hasattr(os, 'set_blocking'),
3726 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003727 def _test_nonblock_pipe_write(self, bufsize):
3728 sent = []
3729 received = []
3730 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003731 os.set_blocking(r, False)
3732 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003733
3734 # To exercise all code paths in the C implementation we need
3735 # to play with buffer sizes. For instance, if we choose a
3736 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3737 # then we will never get a partial write of the buffer.
3738 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3739 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3740
3741 with rf, wf:
3742 for N in 9999, 73, 7574:
3743 try:
3744 i = 0
3745 while True:
3746 msg = bytes([i % 26 + 97]) * N
3747 sent.append(msg)
3748 wf.write(msg)
3749 i += 1
3750
3751 except self.BlockingIOError as e:
3752 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003753 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003754 sent[-1] = sent[-1][:e.characters_written]
3755 received.append(rf.read())
3756 msg = b'BLOCKED'
3757 wf.write(msg)
3758 sent.append(msg)
3759
3760 while True:
3761 try:
3762 wf.flush()
3763 break
3764 except self.BlockingIOError as e:
3765 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003766 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003767 self.assertEqual(e.characters_written, 0)
3768 received.append(rf.read())
3769
3770 received += iter(rf.read, None)
3771
3772 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003773 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003774 self.assertTrue(wf.closed)
3775 self.assertTrue(rf.closed)
3776
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003777 def test_create_fail(self):
3778 # 'x' mode fails if file is existing
3779 with self.open(support.TESTFN, 'w'):
3780 pass
3781 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3782
3783 def test_create_writes(self):
3784 # 'x' mode opens for writing
3785 with self.open(support.TESTFN, 'xb') as f:
3786 f.write(b"spam")
3787 with self.open(support.TESTFN, 'rb') as f:
3788 self.assertEqual(b"spam", f.read())
3789
Christian Heimes7b648752012-09-10 14:48:43 +02003790 def test_open_allargs(self):
3791 # there used to be a buffer overflow in the parser for rawmode
3792 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3793
3794
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003795class CMiscIOTest(MiscIOTest):
3796 io = io
3797
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003798 def test_readinto_buffer_overflow(self):
3799 # Issue #18025
3800 class BadReader(self.io.BufferedIOBase):
3801 def read(self, n=-1):
3802 return b'x' * 10**6
3803 bufio = BadReader()
3804 b = bytearray(2)
3805 self.assertRaises(ValueError, bufio.readinto, b)
3806
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003807 @unittest.skipUnless(threading, 'Threading required for this test.')
3808 def check_daemon_threads_shutdown_deadlock(self, stream_name):
3809 # Issue #23309: deadlocks at shutdown should be avoided when a
3810 # daemon thread and the main thread both write to a file.
3811 code = """if 1:
3812 import sys
3813 import time
3814 import threading
Victor Stinner2a1aed02017-04-21 17:59:23 +02003815 from test.support import SuppressCrashReport
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003816
3817 file = sys.{stream_name}
3818
3819 def run():
3820 while True:
3821 file.write('.')
3822 file.flush()
3823
Victor Stinner2a1aed02017-04-21 17:59:23 +02003824 crash = SuppressCrashReport()
3825 crash.__enter__()
3826 # don't call __exit__(): the crash occurs at Python shutdown
3827
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003828 thread = threading.Thread(target=run)
3829 thread.daemon = True
3830 thread.start()
3831
3832 time.sleep(0.5)
3833 file.write('!')
3834 file.flush()
3835 """.format_map(locals())
3836 res, _ = run_python_until_end("-c", code)
3837 err = res.err.decode()
3838 if res.rc != 0:
3839 # Failure: should be a fatal error
3840 self.assertIn("Fatal Python error: could not acquire lock "
3841 "for <_io.BufferedWriter name='<{stream_name}>'> "
3842 "at interpreter shutdown, possibly due to "
3843 "daemon threads".format_map(locals()),
3844 err)
3845 else:
3846 self.assertFalse(err.strip('.!'))
3847
3848 def test_daemon_threads_shutdown_stdout_deadlock(self):
3849 self.check_daemon_threads_shutdown_deadlock('stdout')
3850
3851 def test_daemon_threads_shutdown_stderr_deadlock(self):
3852 self.check_daemon_threads_shutdown_deadlock('stderr')
3853
3854
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003855class PyMiscIOTest(MiscIOTest):
3856 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003857
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003858
3859@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3860class SignalsTest(unittest.TestCase):
3861
3862 def setUp(self):
3863 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3864
3865 def tearDown(self):
3866 signal.signal(signal.SIGALRM, self.oldalrm)
3867
3868 def alarm_interrupt(self, sig, frame):
3869 1/0
3870
3871 @unittest.skipUnless(threading, 'Threading required for this test.')
3872 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3873 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003874 invokes the signal handler, and bubbles up the exception raised
3875 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003876 read_results = []
3877 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003878 if hasattr(signal, 'pthread_sigmask'):
3879 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003880 s = os.read(r, 1)
3881 read_results.append(s)
3882 t = threading.Thread(target=_read)
3883 t.daemon = True
3884 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003885 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01003886 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003887 try:
3888 wio = self.io.open(w, **fdopen_kwargs)
3889 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003890 # Fill the pipe enough that the write will be blocking.
3891 # It will be interrupted by the timer armed above. Since the
3892 # other thread has read one byte, the low-level write will
3893 # return with a successful (partial) result rather than an EINTR.
3894 # The buffered IO layer must check for pending signal
3895 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003896 signal.alarm(1)
3897 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01003898 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02003899 finally:
3900 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003901 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003902 # We got one byte, get another one and check that it isn't a
3903 # repeat of the first one.
3904 read_results.append(os.read(r, 1))
3905 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3906 finally:
3907 os.close(w)
3908 os.close(r)
3909 # This is deliberate. If we didn't close the file descriptor
3910 # before closing wio, wio would try to flush its internal
3911 # buffer, and block again.
3912 try:
3913 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003914 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003915 if e.errno != errno.EBADF:
3916 raise
3917
3918 def test_interrupted_write_unbuffered(self):
3919 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3920
3921 def test_interrupted_write_buffered(self):
3922 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3923
Victor Stinner6ab72862014-09-03 23:32:28 +02003924 # Issue #22331: The test hangs on FreeBSD 7.2
3925 @support.requires_freebsd_version(8)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003926 def test_interrupted_write_text(self):
3927 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3928
Brett Cannon31f59292011-02-21 19:29:56 +00003929 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003930 def check_reentrant_write(self, data, **fdopen_kwargs):
3931 def on_alarm(*args):
3932 # Will be called reentrantly from the same thread
3933 wio.write(data)
3934 1/0
3935 signal.signal(signal.SIGALRM, on_alarm)
3936 r, w = os.pipe()
3937 wio = self.io.open(w, **fdopen_kwargs)
3938 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003939 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003940 # Either the reentrant call to wio.write() fails with RuntimeError,
3941 # or the signal handler raises ZeroDivisionError.
3942 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3943 while 1:
3944 for i in range(100):
3945 wio.write(data)
3946 wio.flush()
3947 # Make sure the buffer doesn't fill up and block further writes
3948 os.read(r, len(data) * 100)
3949 exc = cm.exception
3950 if isinstance(exc, RuntimeError):
3951 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3952 finally:
3953 wio.close()
3954 os.close(r)
3955
3956 def test_reentrant_write_buffered(self):
3957 self.check_reentrant_write(b"xy", mode="wb")
3958
3959 def test_reentrant_write_text(self):
3960 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3961
Antoine Pitrou707ce822011-02-25 21:24:11 +00003962 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3963 """Check that a buffered read, when it gets interrupted (either
3964 returning a partial result or EINTR), properly invokes the signal
3965 handler and retries if the latter returned successfully."""
3966 r, w = os.pipe()
3967 fdopen_kwargs["closefd"] = False
3968 def alarm_handler(sig, frame):
3969 os.write(w, b"bar")
3970 signal.signal(signal.SIGALRM, alarm_handler)
3971 try:
3972 rio = self.io.open(r, **fdopen_kwargs)
3973 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003974 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003975 # Expected behaviour:
3976 # - first raw read() returns partial b"foo"
3977 # - second raw read() returns EINTR
3978 # - third raw read() returns b"bar"
3979 self.assertEqual(decode(rio.read(6)), "foobar")
3980 finally:
3981 rio.close()
3982 os.close(w)
3983 os.close(r)
3984
Antoine Pitrou20db5112011-08-19 20:32:34 +02003985 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003986 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3987 mode="rb")
3988
Antoine Pitrou20db5112011-08-19 20:32:34 +02003989 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003990 self.check_interrupted_read_retry(lambda x: x,
3991 mode="r")
3992
3993 @unittest.skipUnless(threading, 'Threading required for this test.')
3994 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3995 """Check that a buffered write, when it gets interrupted (either
3996 returning a partial result or EINTR), properly invokes the signal
3997 handler and retries if the latter returned successfully."""
3998 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003999
Antoine Pitrou707ce822011-02-25 21:24:11 +00004000 # A quantity that exceeds the buffer size of an anonymous pipe's
4001 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02004002 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00004003 r, w = os.pipe()
4004 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004005
Antoine Pitrou707ce822011-02-25 21:24:11 +00004006 # We need a separate thread to read from the pipe and allow the
4007 # write() to finish. This thread is started after the SIGALRM is
4008 # received (forcing a first EINTR in write()).
4009 read_results = []
4010 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004011 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00004012 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004013 try:
4014 while not write_finished:
4015 while r in select.select([r], [], [], 1.0)[0]:
4016 s = os.read(r, 1024)
4017 read_results.append(s)
4018 except BaseException as exc:
4019 nonlocal error
4020 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00004021 t = threading.Thread(target=_read)
4022 t.daemon = True
4023 def alarm1(sig, frame):
4024 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004025 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004026 def alarm2(sig, frame):
4027 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004028
4029 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00004030 signal.signal(signal.SIGALRM, alarm1)
4031 try:
4032 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004033 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004034 # Expected behaviour:
4035 # - first raw write() is partial (because of the limited pipe buffer
4036 # and the first alarm)
4037 # - second raw write() returns EINTR (because of the second alarm)
4038 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004039 written = wio.write(large_data)
4040 self.assertEqual(N, written)
4041
Antoine Pitrou707ce822011-02-25 21:24:11 +00004042 wio.flush()
4043 write_finished = True
4044 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004045
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004046 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004047 self.assertEqual(N, sum(len(x) for x in read_results))
4048 finally:
4049 write_finished = True
4050 os.close(w)
4051 os.close(r)
4052 # This is deliberate. If we didn't close the file descriptor
4053 # before closing wio, wio would try to flush its internal
4054 # buffer, and could block (in case of failure).
4055 try:
4056 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004057 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00004058 if e.errno != errno.EBADF:
4059 raise
4060
Antoine Pitrou20db5112011-08-19 20:32:34 +02004061 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004062 self.check_interrupted_write_retry(b"x", mode="wb")
4063
Antoine Pitrou20db5112011-08-19 20:32:34 +02004064 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004065 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4066
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004067
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004068class CSignalsTest(SignalsTest):
4069 io = io
4070
4071class PySignalsTest(SignalsTest):
4072 io = pyio
4073
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004074 # Handling reentrancy issues would slow down _pyio even more, so the
4075 # tests are disabled.
4076 test_reentrant_write_buffered = None
4077 test_reentrant_write_text = None
4078
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004079
Ezio Melottidaa42c72013-03-23 16:30:16 +02004080def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07004081 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004082 CBufferedReaderTest, PyBufferedReaderTest,
4083 CBufferedWriterTest, PyBufferedWriterTest,
4084 CBufferedRWPairTest, PyBufferedRWPairTest,
4085 CBufferedRandomTest, PyBufferedRandomTest,
4086 StatefulIncrementalDecoderTest,
4087 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4088 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004089 CMiscIOTest, PyMiscIOTest,
4090 CSignalsTest, PySignalsTest,
4091 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004092
4093 # Put the namespaces of the IO module we are testing and some useful mock
4094 # classes in the __dict__ of each test.
4095 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00004096 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004097 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4098 c_io_ns = {name : getattr(io, name) for name in all_members}
4099 py_io_ns = {name : getattr(pyio, name) for name in all_members}
4100 globs = globals()
4101 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4102 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4103 # Avoid turning open into a bound method.
4104 py_io_ns["open"] = pyio.OpenWrapper
4105 for test in tests:
4106 if test.__name__.startswith("C"):
4107 for name, obj in c_io_ns.items():
4108 setattr(test, name, obj)
4109 elif test.__name__.startswith("Py"):
4110 for name, obj in py_io_ns.items():
4111 setattr(test, name, obj)
4112
Ezio Melottidaa42c72013-03-23 16:30:16 +02004113 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4114 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004115
4116if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004117 unittest.main()