blob: 8a2111cbd71c6285301d1349b6b3b313ca3d5f53 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Berker Peksagce643912015-05-06 06:33:17 +030038from test.support.script_helper import assert_python_ok, run_python_until_end
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000047
Martin Panter6bb91f32016-05-28 00:41:57 +000048try:
49 import ctypes
50except ImportError:
51 def byteslike(*pos, **kw):
52 return array.array("b", bytes(*pos, **kw))
53else:
54 def byteslike(*pos, **kw):
55 """Create a bytes-like object having no string or sequence methods"""
56 data = bytes(*pos, **kw)
57 obj = EmptyStruct()
58 ctypes.resize(obj, len(data))
59 memoryview(obj).cast("B")[:] = data
60 return obj
61 class EmptyStruct(ctypes.Structure):
62 pass
63
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064def _default_chunk_size():
65 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000066 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000067 return f._CHUNK_SIZE
68
69
Antoine Pitrou328ec742010-09-14 18:37:24 +000070class MockRawIOWithoutRead:
71 """A RawIO implementation without read(), so as to exercise the default
72 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000073
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000074 def __init__(self, read_stack=()):
75 self._read_stack = list(read_stack)
76 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000077 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000078 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000079
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000082 return len(b)
83
84 def writable(self):
85 return True
86
Guido van Rossum68bbcd22007-02-27 17:19:33 +000087 def fileno(self):
88 return 42
89
90 def readable(self):
91 return True
92
Guido van Rossum01a27522007-03-07 01:00:12 +000093 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000094 return True
95
Guido van Rossum01a27522007-03-07 01:00:12 +000096 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000098
99 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000100 return 0 # same comment as above
101
102 def readinto(self, buf):
103 self._reads += 1
104 max_len = len(buf)
105 try:
106 data = self._read_stack[0]
107 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000108 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000109 return 0
110 if data is None:
111 del self._read_stack[0]
112 return None
113 n = len(data)
114 if len(data) <= max_len:
115 del self._read_stack[0]
116 buf[:n] = data
117 return n
118 else:
119 buf[:] = data[:max_len]
120 self._read_stack[0] = data[max_len:]
121 return max_len
122
123 def truncate(self, pos=None):
124 return pos
125
Antoine Pitrou328ec742010-09-14 18:37:24 +0000126class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
127 pass
128
129class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
130 pass
131
132
133class MockRawIO(MockRawIOWithoutRead):
134
135 def read(self, n=None):
136 self._reads += 1
137 try:
138 return self._read_stack.pop(0)
139 except:
140 self._extraneous_reads += 1
141 return b""
142
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000143class CMockRawIO(MockRawIO, io.RawIOBase):
144 pass
145
146class PyMockRawIO(MockRawIO, pyio.RawIOBase):
147 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000148
Guido van Rossuma9e20242007-03-08 00:43:48 +0000149
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000150class MisbehavedRawIO(MockRawIO):
151 def write(self, b):
152 return super().write(b) * 2
153
154 def read(self, n=None):
155 return super().read(n) * 2
156
157 def seek(self, pos, whence):
158 return -123
159
160 def tell(self):
161 return -456
162
163 def readinto(self, buf):
164 super().readinto(buf)
165 return len(buf) * 5
166
167class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
168 pass
169
170class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
171 pass
172
173
174class CloseFailureIO(MockRawIO):
175 closed = 0
176
177 def close(self):
178 if not self.closed:
179 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200180 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181
182class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
183 pass
184
185class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
186 pass
187
188
189class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000190
191 def __init__(self, data):
192 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000193 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000194
195 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000197 self.read_history.append(None if res is None else len(res))
198 return res
199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000200 def readinto(self, b):
201 res = super().readinto(b)
202 self.read_history.append(res)
203 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000204
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000205class CMockFileIO(MockFileIO, io.BytesIO):
206 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000207
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000208class PyMockFileIO(MockFileIO, pyio.BytesIO):
209 pass
210
211
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000212class MockUnseekableIO:
213 def seekable(self):
214 return False
215
216 def seek(self, *args):
217 raise self.UnsupportedOperation("not seekable")
218
219 def tell(self, *args):
220 raise self.UnsupportedOperation("not seekable")
221
Martin Panter754aab22016-03-31 07:21:56 +0000222 def truncate(self, *args):
223 raise self.UnsupportedOperation("not seekable")
224
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000225class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
226 UnsupportedOperation = io.UnsupportedOperation
227
228class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
229 UnsupportedOperation = pyio.UnsupportedOperation
230
231
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000232class MockNonBlockWriterIO:
233
234 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000235 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000236 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000238 def pop_written(self):
239 s = b"".join(self._write_stack)
240 self._write_stack[:] = []
241 return s
242
243 def block_on(self, char):
244 """Block when a given char is encountered."""
245 self._blocker_char = char
246
247 def readable(self):
248 return True
249
250 def seekable(self):
251 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000252
Guido van Rossum01a27522007-03-07 01:00:12 +0000253 def writable(self):
254 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000255
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000256 def write(self, b):
257 b = bytes(b)
258 n = -1
259 if self._blocker_char:
260 try:
261 n = b.index(self._blocker_char)
262 except ValueError:
263 pass
264 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100265 if n > 0:
266 # write data up to the first blocker
267 self._write_stack.append(b[:n])
268 return n
269 else:
270 # cancel blocker and indicate would block
271 self._blocker_char = None
272 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000273 self._write_stack.append(b)
274 return len(b)
275
276class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
277 BlockingIOError = io.BlockingIOError
278
279class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
280 BlockingIOError = pyio.BlockingIOError
281
Guido van Rossuma9e20242007-03-08 00:43:48 +0000282
Guido van Rossum28524c72007-02-27 05:47:44 +0000283class IOTest(unittest.TestCase):
284
Neal Norwitze7789b12008-03-24 06:18:09 +0000285 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000286 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000287
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000288 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000289 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000290
Guido van Rossum28524c72007-02-27 05:47:44 +0000291 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000292 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293 f.truncate(0)
294 self.assertEqual(f.tell(), 5)
295 f.seek(0)
296
297 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000298 self.assertEqual(f.seek(0), 0)
299 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000300 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000301 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000302 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000303 buffer = bytearray(b" world\n\n\n")
304 self.assertEqual(f.write(buffer), 9)
305 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000306 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000307 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000308 self.assertEqual(f.seek(-1, 2), 13)
309 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000310
Guido van Rossum87429772007-04-10 21:06:59 +0000311 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000312 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000313 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000314
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 def read_ops(self, f, buffered=False):
316 data = f.read(5)
317 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000318 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000319 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000320 self.assertEqual(bytes(data), b" worl")
321 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000322 self.assertEqual(f.readinto(data), 2)
323 self.assertEqual(len(data), 5)
324 self.assertEqual(data[:2], b"d\n")
325 self.assertEqual(f.seek(0), 0)
326 self.assertEqual(f.read(20), b"hello world\n")
327 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000328 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000329 self.assertEqual(f.seek(-6, 2), 6)
330 self.assertEqual(f.read(5), b"world")
331 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000332 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000333 self.assertEqual(f.seek(-6, 1), 5)
334 self.assertEqual(f.read(5), b" worl")
335 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000336 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000337 if buffered:
338 f.seek(0)
339 self.assertEqual(f.read(), b"hello world\n")
340 f.seek(6)
341 self.assertEqual(f.read(), b"world\n")
342 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000343 f.seek(0)
344 data = byteslike(5)
345 self.assertEqual(f.readinto1(data), 5)
346 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000347
Guido van Rossum34d69e52007-04-10 20:08:41 +0000348 LARGE = 2**31
349
Guido van Rossum53807da2007-04-10 19:01:47 +0000350 def large_file_ops(self, f):
351 assert f.readable()
352 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000353 self.assertEqual(f.seek(self.LARGE), self.LARGE)
354 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000355 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000356 self.assertEqual(f.tell(), self.LARGE + 3)
357 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000358 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000359 self.assertEqual(f.tell(), self.LARGE + 2)
360 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000361 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000362 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000363 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
364 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000365 self.assertEqual(f.read(2), b"x")
366
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000367 def test_invalid_operations(self):
368 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000369 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000370 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000371 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000372 self.assertRaises(exc, fp.read)
373 self.assertRaises(exc, fp.readline)
374 with self.open(support.TESTFN, "wb", buffering=0) as fp:
375 self.assertRaises(exc, fp.read)
376 self.assertRaises(exc, fp.readline)
377 with self.open(support.TESTFN, "rb", buffering=0) as fp:
378 self.assertRaises(exc, fp.write, b"blah")
379 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000380 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000381 self.assertRaises(exc, fp.write, b"blah")
382 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000383 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000384 self.assertRaises(exc, fp.write, "blah")
385 self.assertRaises(exc, fp.writelines, ["blah\n"])
386 # Non-zero seeking from current or end pos
387 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
388 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000389
Martin Panter754aab22016-03-31 07:21:56 +0000390 def test_optional_abilities(self):
391 # Test for OSError when optional APIs are not supported
392 # The purpose of this test is to try fileno(), reading, writing and
393 # seeking operations with various objects that indicate they do not
394 # support these operations.
395
396 def pipe_reader():
397 [r, w] = os.pipe()
398 os.close(w) # So that read() is harmless
399 return self.FileIO(r, "r")
400
401 def pipe_writer():
402 [r, w] = os.pipe()
403 self.addCleanup(os.close, r)
404 # Guarantee that we can write into the pipe without blocking
405 thread = threading.Thread(target=os.read, args=(r, 100))
406 thread.start()
407 self.addCleanup(thread.join)
408 return self.FileIO(w, "w")
409
410 def buffered_reader():
411 return self.BufferedReader(self.MockUnseekableIO())
412
413 def buffered_writer():
414 return self.BufferedWriter(self.MockUnseekableIO())
415
416 def buffered_random():
417 return self.BufferedRandom(self.BytesIO())
418
419 def buffered_rw_pair():
420 return self.BufferedRWPair(self.MockUnseekableIO(),
421 self.MockUnseekableIO())
422
423 def text_reader():
424 class UnseekableReader(self.MockUnseekableIO):
425 writable = self.BufferedIOBase.writable
426 write = self.BufferedIOBase.write
427 return self.TextIOWrapper(UnseekableReader(), "ascii")
428
429 def text_writer():
430 class UnseekableWriter(self.MockUnseekableIO):
431 readable = self.BufferedIOBase.readable
432 read = self.BufferedIOBase.read
433 return self.TextIOWrapper(UnseekableWriter(), "ascii")
434
435 tests = (
436 (pipe_reader, "fr"), (pipe_writer, "fw"),
437 (buffered_reader, "r"), (buffered_writer, "w"),
438 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
439 (text_reader, "r"), (text_writer, "w"),
440 (self.BytesIO, "rws"), (self.StringIO, "rws"),
441 )
442 for [test, abilities] in tests:
443 if test is pipe_writer and not threading:
444 continue # Skip subtest that uses a background thread
445 with self.subTest(test), test() as obj:
446 readable = "r" in abilities
447 self.assertEqual(obj.readable(), readable)
448 writable = "w" in abilities
449 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000450
451 if isinstance(obj, self.TextIOBase):
452 data = "3"
453 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
454 data = b"3"
455 else:
456 self.fail("Unknown base class")
457
458 if "f" in abilities:
459 obj.fileno()
460 else:
461 self.assertRaises(OSError, obj.fileno)
462
463 if readable:
464 obj.read(1)
465 obj.read()
466 else:
467 self.assertRaises(OSError, obj.read, 1)
468 self.assertRaises(OSError, obj.read)
469
470 if writable:
471 obj.write(data)
472 else:
473 self.assertRaises(OSError, obj.write, data)
474
Martin Panter3ee147f2016-03-31 21:05:31 +0000475 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000476 pipe_reader, pipe_writer):
477 # Pipes seem to appear as seekable on Windows
478 continue
479 seekable = "s" in abilities
480 self.assertEqual(obj.seekable(), seekable)
481
Martin Panter754aab22016-03-31 07:21:56 +0000482 if seekable:
483 obj.tell()
484 obj.seek(0)
485 else:
486 self.assertRaises(OSError, obj.tell)
487 self.assertRaises(OSError, obj.seek, 0)
488
489 if writable and seekable:
490 obj.truncate()
491 obj.truncate(0)
492 else:
493 self.assertRaises(OSError, obj.truncate)
494 self.assertRaises(OSError, obj.truncate, 0)
495
Antoine Pitrou13348842012-01-29 18:36:34 +0100496 def test_open_handles_NUL_chars(self):
497 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300498 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100499
500 bytes_fn = bytes(fn_with_NUL, 'ascii')
501 with warnings.catch_warnings():
502 warnings.simplefilter("ignore", DeprecationWarning)
503 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100504
Guido van Rossum28524c72007-02-27 05:47:44 +0000505 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000506 with self.open(support.TESTFN, "wb", buffering=0) as f:
507 self.assertEqual(f.readable(), False)
508 self.assertEqual(f.writable(), True)
509 self.assertEqual(f.seekable(), True)
510 self.write_ops(f)
511 with self.open(support.TESTFN, "rb", buffering=0) as f:
512 self.assertEqual(f.readable(), True)
513 self.assertEqual(f.writable(), False)
514 self.assertEqual(f.seekable(), True)
515 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000516
Guido van Rossum87429772007-04-10 21:06:59 +0000517 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000518 with self.open(support.TESTFN, "wb") as f:
519 self.assertEqual(f.readable(), False)
520 self.assertEqual(f.writable(), True)
521 self.assertEqual(f.seekable(), True)
522 self.write_ops(f)
523 with self.open(support.TESTFN, "rb") as f:
524 self.assertEqual(f.readable(), True)
525 self.assertEqual(f.writable(), False)
526 self.assertEqual(f.seekable(), True)
527 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000528
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000529 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000530 with self.open(support.TESTFN, "wb") as f:
531 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
532 with self.open(support.TESTFN, "rb") as f:
533 self.assertEqual(f.readline(), b"abc\n")
534 self.assertEqual(f.readline(10), b"def\n")
535 self.assertEqual(f.readline(2), b"xy")
536 self.assertEqual(f.readline(4), b"zzy\n")
537 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000538 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000539 self.assertRaises(TypeError, f.readline, 5.3)
540 with self.open(support.TESTFN, "r") as f:
541 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000542
Guido van Rossum28524c72007-02-27 05:47:44 +0000543 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000544 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000545 self.write_ops(f)
546 data = f.getvalue()
547 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000548 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000549 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000550
Guido van Rossum53807da2007-04-10 19:01:47 +0000551 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000552 # On Windows and Mac OSX this test comsumes large resources; It takes
553 # a long time to build the >2GB file and takes >2GB of disk space
554 # therefore the resource must be enabled to run this test.
555 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600556 support.requires(
557 'largefile',
558 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000559 with self.open(support.TESTFN, "w+b", 0) as f:
560 self.large_file_ops(f)
561 with self.open(support.TESTFN, "w+b") as f:
562 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000563
564 def test_with_open(self):
565 for bufsize in (0, 1, 100):
566 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000567 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000568 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000569 self.assertEqual(f.closed, True)
570 f = None
571 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000572 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000573 1/0
574 except ZeroDivisionError:
575 self.assertEqual(f.closed, True)
576 else:
577 self.fail("1/0 didn't raise an exception")
578
Antoine Pitrou08838b62009-01-21 00:55:13 +0000579 # issue 5008
580 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000581 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000582 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000583 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000584 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000585 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000586 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000587 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300588 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000589
Guido van Rossum87429772007-04-10 21:06:59 +0000590 def test_destructor(self):
591 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000592 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000593 def __del__(self):
594 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000595 try:
596 f = super().__del__
597 except AttributeError:
598 pass
599 else:
600 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000601 def close(self):
602 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000603 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000604 def flush(self):
605 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000606 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000607 with support.check_warnings(('', ResourceWarning)):
608 f = MyFileIO(support.TESTFN, "wb")
609 f.write(b"xxx")
610 del f
611 support.gc_collect()
612 self.assertEqual(record, [1, 2, 3])
613 with self.open(support.TESTFN, "rb") as f:
614 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000615
616 def _check_base_destructor(self, base):
617 record = []
618 class MyIO(base):
619 def __init__(self):
620 # This exercises the availability of attributes on object
621 # destruction.
622 # (in the C version, close() is called by the tp_dealloc
623 # function, not by __del__)
624 self.on_del = 1
625 self.on_close = 2
626 self.on_flush = 3
627 def __del__(self):
628 record.append(self.on_del)
629 try:
630 f = super().__del__
631 except AttributeError:
632 pass
633 else:
634 f()
635 def close(self):
636 record.append(self.on_close)
637 super().close()
638 def flush(self):
639 record.append(self.on_flush)
640 super().flush()
641 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000642 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000643 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000644 self.assertEqual(record, [1, 2, 3])
645
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000646 def test_IOBase_destructor(self):
647 self._check_base_destructor(self.IOBase)
648
649 def test_RawIOBase_destructor(self):
650 self._check_base_destructor(self.RawIOBase)
651
652 def test_BufferedIOBase_destructor(self):
653 self._check_base_destructor(self.BufferedIOBase)
654
655 def test_TextIOBase_destructor(self):
656 self._check_base_destructor(self.TextIOBase)
657
Guido van Rossum87429772007-04-10 21:06:59 +0000658 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000659 with self.open(support.TESTFN, "wb") as f:
660 f.write(b"xxx")
661 with self.open(support.TESTFN, "rb") as f:
662 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000663
Guido van Rossumd4103952007-04-12 05:44:49 +0000664 def test_array_writes(self):
665 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000666 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000667 def check(f):
668 with f:
669 self.assertEqual(f.write(a), n)
670 f.writelines((a,))
671 check(self.BytesIO())
672 check(self.FileIO(support.TESTFN, "w"))
673 check(self.BufferedWriter(self.MockRawIO()))
674 check(self.BufferedRandom(self.MockRawIO()))
675 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000676
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000677 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000678 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000679 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000680
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000681 def test_read_closed(self):
682 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000683 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000684 with self.open(support.TESTFN, "r") as f:
685 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000686 self.assertEqual(file.read(), "egg\n")
687 file.seek(0)
688 file.close()
689 self.assertRaises(ValueError, file.read)
690
691 def test_no_closefd_with_filename(self):
692 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000693 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000694
695 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000696 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000697 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000698 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000699 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000700 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000701 self.assertEqual(file.buffer.raw.closefd, False)
702
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000703 def test_garbage_collection(self):
704 # FileIO objects are collected, and collecting them flushes
705 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000706 with support.check_warnings(('', ResourceWarning)):
707 f = self.FileIO(support.TESTFN, "wb")
708 f.write(b"abcxxx")
709 f.f = f
710 wr = weakref.ref(f)
711 del f
712 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300713 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000714 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000715 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000716
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000717 def test_unbounded_file(self):
718 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
719 zero = "/dev/zero"
720 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000721 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000722 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000723 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000724 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000725 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000726 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000727 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000728 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000729 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000730 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000731 self.assertRaises(OverflowError, f.read)
732
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200733 def check_flush_error_on_close(self, *args, **kwargs):
734 # Test that the file is closed despite failed flush
735 # and that flush() is called before file closed.
736 f = self.open(*args, **kwargs)
737 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000738 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200739 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200740 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000741 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200742 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600743 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200744 self.assertTrue(closed) # flush() called
745 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200746 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200747
748 def test_flush_error_on_close(self):
749 # raw file
750 # Issue #5700: io.FileIO calls flush() after file closed
751 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
752 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
753 self.check_flush_error_on_close(fd, 'wb', buffering=0)
754 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
755 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
756 os.close(fd)
757 # buffered io
758 self.check_flush_error_on_close(support.TESTFN, 'wb')
759 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
760 self.check_flush_error_on_close(fd, 'wb')
761 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
762 self.check_flush_error_on_close(fd, 'wb', closefd=False)
763 os.close(fd)
764 # text io
765 self.check_flush_error_on_close(support.TESTFN, 'w')
766 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
767 self.check_flush_error_on_close(fd, 'w')
768 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
769 self.check_flush_error_on_close(fd, 'w', closefd=False)
770 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000771
772 def test_multi_close(self):
773 f = self.open(support.TESTFN, "wb", buffering=0)
774 f.close()
775 f.close()
776 f.close()
777 self.assertRaises(ValueError, f.flush)
778
Antoine Pitrou328ec742010-09-14 18:37:24 +0000779 def test_RawIOBase_read(self):
780 # Exercise the default RawIOBase.read() implementation (which calls
781 # readinto() internally).
782 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
783 self.assertEqual(rawio.read(2), b"ab")
784 self.assertEqual(rawio.read(2), b"c")
785 self.assertEqual(rawio.read(2), b"d")
786 self.assertEqual(rawio.read(2), None)
787 self.assertEqual(rawio.read(2), b"ef")
788 self.assertEqual(rawio.read(2), b"g")
789 self.assertEqual(rawio.read(2), None)
790 self.assertEqual(rawio.read(2), b"")
791
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400792 def test_types_have_dict(self):
793 test = (
794 self.IOBase(),
795 self.RawIOBase(),
796 self.TextIOBase(),
797 self.StringIO(),
798 self.BytesIO()
799 )
800 for obj in test:
801 self.assertTrue(hasattr(obj, "__dict__"))
802
Ross Lagerwall59142db2011-10-31 20:34:46 +0200803 def test_opener(self):
804 with self.open(support.TESTFN, "w") as f:
805 f.write("egg\n")
806 fd = os.open(support.TESTFN, os.O_RDONLY)
807 def opener(path, flags):
808 return fd
809 with self.open("non-existent", "r", opener=opener) as f:
810 self.assertEqual(f.read(), "egg\n")
811
Barry Warsaw480e2852016-06-08 17:47:26 -0400812 def test_bad_opener_negative_1(self):
813 # Issue #27066.
814 def badopener(fname, flags):
815 return -1
816 with self.assertRaises(ValueError) as cm:
817 open('non-existent', 'r', opener=badopener)
818 self.assertEqual(str(cm.exception), 'opener returned -1')
819
820 def test_bad_opener_other_negative(self):
821 # Issue #27066.
822 def badopener(fname, flags):
823 return -2
824 with self.assertRaises(ValueError) as cm:
825 open('non-existent', 'r', opener=badopener)
826 self.assertEqual(str(cm.exception), 'opener returned -2')
827
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200828 def test_fileio_closefd(self):
829 # Issue #4841
830 with self.open(__file__, 'rb') as f1, \
831 self.open(__file__, 'rb') as f2:
832 fileio = self.FileIO(f1.fileno(), closefd=False)
833 # .__init__() must not close f1
834 fileio.__init__(f2.fileno(), closefd=False)
835 f1.readline()
836 # .close() must not close f2
837 fileio.close()
838 f2.readline()
839
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300840 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200841 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300842 with self.assertRaises(ValueError):
843 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300844
845 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200846 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300847 with self.assertRaises(ValueError):
848 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300849
Martin Panter6bb91f32016-05-28 00:41:57 +0000850 def test_buffered_readinto_mixin(self):
851 # Test the implementation provided by BufferedIOBase
852 class Stream(self.BufferedIOBase):
853 def read(self, size):
854 return b"12345"
855 read1 = read
856 stream = Stream()
857 for method in ("readinto", "readinto1"):
858 with self.subTest(method):
859 buffer = byteslike(5)
860 self.assertEqual(getattr(stream, method)(buffer), 5)
861 self.assertEqual(bytes(buffer), b"12345")
862
Ethan Furmand62548a2016-06-04 14:38:43 -0700863 def test_fspath_support(self):
864 class PathLike:
865 def __init__(self, path):
866 self.path = path
867
868 def __fspath__(self):
869 return self.path
870
871 def check_path_succeeds(path):
872 with self.open(path, "w") as f:
873 f.write("egg\n")
874
875 with self.open(path, "r") as f:
876 self.assertEqual(f.read(), "egg\n")
877
878 check_path_succeeds(PathLike(support.TESTFN))
879 check_path_succeeds(PathLike(support.TESTFN.encode('utf-8')))
880
881 bad_path = PathLike(TypeError)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700882 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700883 self.open(bad_path, 'w')
884
885 # ensure that refcounting is correct with some error conditions
886 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
887 self.open(PathLike(support.TESTFN), 'rwxa')
888
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200889
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000890class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200891
892 def test_IOBase_finalize(self):
893 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
894 # class which inherits IOBase and an object of this class are caught
895 # in a reference cycle and close() is already in the method cache.
896 class MyIO(self.IOBase):
897 def close(self):
898 pass
899
900 # create an instance to populate the method cache
901 MyIO()
902 obj = MyIO()
903 obj.obj = obj
904 wr = weakref.ref(obj)
905 del MyIO
906 del obj
907 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300908 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000909
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000910class PyIOTest(IOTest):
911 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000912
Guido van Rossuma9e20242007-03-08 00:43:48 +0000913
Gregory P. Smith1bef9072015-04-14 13:24:34 -0700914@support.cpython_only
915class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -0700916
Gregory P. Smith054b0652015-04-14 12:58:05 -0700917 def test_RawIOBase_io_in_pyio_match(self):
918 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +0200919 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
920 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -0700921 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
922
923 def test_RawIOBase_pyio_in_io_match(self):
924 """Test that c RawIOBase class has all pyio RawIOBase methods"""
925 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
926 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
927
928
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000929class CommonBufferedTests:
930 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
931
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000932 def test_detach(self):
933 raw = self.MockRawIO()
934 buf = self.tp(raw)
935 self.assertIs(buf.detach(), raw)
936 self.assertRaises(ValueError, buf.detach)
937
Benjamin Peterson10e76b62014-12-21 20:51:50 -0600938 repr(buf) # Should still work
939
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000940 def test_fileno(self):
941 rawio = self.MockRawIO()
942 bufio = self.tp(rawio)
943
Ezio Melottib3aedd42010-11-20 19:04:17 +0000944 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000945
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000946 def test_invalid_args(self):
947 rawio = self.MockRawIO()
948 bufio = self.tp(rawio)
949 # Invalid whence
950 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200951 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000952
953 def test_override_destructor(self):
954 tp = self.tp
955 record = []
956 class MyBufferedIO(tp):
957 def __del__(self):
958 record.append(1)
959 try:
960 f = super().__del__
961 except AttributeError:
962 pass
963 else:
964 f()
965 def close(self):
966 record.append(2)
967 super().close()
968 def flush(self):
969 record.append(3)
970 super().flush()
971 rawio = self.MockRawIO()
972 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000973 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000974 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +0000975 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000976
977 def test_context_manager(self):
978 # Test usability as a context manager
979 rawio = self.MockRawIO()
980 bufio = self.tp(rawio)
981 def _with():
982 with bufio:
983 pass
984 _with()
985 # bufio should now be closed, and using it a second time should raise
986 # a ValueError.
987 self.assertRaises(ValueError, _with)
988
989 def test_error_through_destructor(self):
990 # Test that the exception state is not modified by a destructor,
991 # even if close() fails.
992 rawio = self.CloseFailureIO()
993 def f():
994 self.tp(rawio).xyzzy
995 with support.captured_output("stderr") as s:
996 self.assertRaises(AttributeError, f)
997 s = s.getvalue().strip()
998 if s:
999 # The destructor *may* have printed an unraisable error, check it
1000 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001001 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001002 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +00001003
Antoine Pitrou716c4442009-05-23 19:04:03 +00001004 def test_repr(self):
1005 raw = self.MockRawIO()
1006 b = self.tp(raw)
Serhiy Storchaka521e5862014-07-22 15:00:37 +03001007 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001008 self.assertEqual(repr(b), "<%s>" % clsname)
1009 raw.name = "dummy"
1010 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
1011 raw.name = b"dummy"
1012 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
1013
Antoine Pitrou6be88762010-05-03 16:48:20 +00001014 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001015 # Test that buffered file is closed despite failed flush
1016 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001017 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001018 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001019 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001020 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001021 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001022 raw.flush = bad_flush
1023 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001024 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001025 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001026 self.assertTrue(raw.closed)
1027 self.assertTrue(closed) # flush() called
1028 self.assertFalse(closed[0]) # flush() called before file closed
1029 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001030 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001031
1032 def test_close_error_on_close(self):
1033 raw = self.MockRawIO()
1034 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001035 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001036 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001037 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001038 raw.close = bad_close
1039 b = self.tp(raw)
1040 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001041 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001042 b.close()
1043 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001044 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001045 self.assertEqual(err.exception.__context__.args, ('flush',))
1046 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001047
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001048 def test_nonnormalized_close_error_on_close(self):
1049 # Issue #21677
1050 raw = self.MockRawIO()
1051 def bad_flush():
1052 raise non_existing_flush
1053 def bad_close():
1054 raise non_existing_close
1055 raw.close = bad_close
1056 b = self.tp(raw)
1057 b.flush = bad_flush
1058 with self.assertRaises(NameError) as err: # exception not swallowed
1059 b.close()
1060 self.assertIn('non_existing_close', str(err.exception))
1061 self.assertIsInstance(err.exception.__context__, NameError)
1062 self.assertIn('non_existing_flush', str(err.exception.__context__))
1063 self.assertFalse(b.closed)
1064
Antoine Pitrou6be88762010-05-03 16:48:20 +00001065 def test_multi_close(self):
1066 raw = self.MockRawIO()
1067 b = self.tp(raw)
1068 b.close()
1069 b.close()
1070 b.close()
1071 self.assertRaises(ValueError, b.flush)
1072
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001073 def test_unseekable(self):
1074 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1075 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1076 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1077
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001078 def test_readonly_attributes(self):
1079 raw = self.MockRawIO()
1080 buf = self.tp(raw)
1081 x = self.MockRawIO()
1082 with self.assertRaises(AttributeError):
1083 buf.raw = x
1084
Guido van Rossum78892e42007-04-06 17:31:18 +00001085
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001086class SizeofTest:
1087
1088 @support.cpython_only
1089 def test_sizeof(self):
1090 bufsize1 = 4096
1091 bufsize2 = 8192
1092 rawio = self.MockRawIO()
1093 bufio = self.tp(rawio, buffer_size=bufsize1)
1094 size = sys.getsizeof(bufio) - bufsize1
1095 rawio = self.MockRawIO()
1096 bufio = self.tp(rawio, buffer_size=bufsize2)
1097 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1098
Jesus Ceadc469452012-10-04 12:37:56 +02001099 @support.cpython_only
1100 def test_buffer_freeing(self) :
1101 bufsize = 4096
1102 rawio = self.MockRawIO()
1103 bufio = self.tp(rawio, buffer_size=bufsize)
1104 size = sys.getsizeof(bufio) - bufsize
1105 bufio.close()
1106 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001107
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001108class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1109 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001110
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001111 def test_constructor(self):
1112 rawio = self.MockRawIO([b"abc"])
1113 bufio = self.tp(rawio)
1114 bufio.__init__(rawio)
1115 bufio.__init__(rawio, buffer_size=1024)
1116 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001117 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001118 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1119 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1120 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1121 rawio = self.MockRawIO([b"abc"])
1122 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001123 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001124
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001125 def test_uninitialized(self):
1126 bufio = self.tp.__new__(self.tp)
1127 del bufio
1128 bufio = self.tp.__new__(self.tp)
1129 self.assertRaisesRegex((ValueError, AttributeError),
1130 'uninitialized|has no attribute',
1131 bufio.read, 0)
1132 bufio.__init__(self.MockRawIO())
1133 self.assertEqual(bufio.read(0), b'')
1134
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001135 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001136 for arg in (None, 7):
1137 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1138 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001139 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001140 # Invalid args
1141 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001142
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001143 def test_read1(self):
1144 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1145 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001146 self.assertEqual(b"a", bufio.read(1))
1147 self.assertEqual(b"b", bufio.read1(1))
1148 self.assertEqual(rawio._reads, 1)
1149 self.assertEqual(b"c", bufio.read1(100))
1150 self.assertEqual(rawio._reads, 1)
1151 self.assertEqual(b"d", bufio.read1(100))
1152 self.assertEqual(rawio._reads, 2)
1153 self.assertEqual(b"efg", bufio.read1(100))
1154 self.assertEqual(rawio._reads, 3)
1155 self.assertEqual(b"", bufio.read1(100))
1156 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001157 # Invalid args
1158 self.assertRaises(ValueError, bufio.read1, -1)
1159
1160 def test_readinto(self):
1161 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1162 bufio = self.tp(rawio)
1163 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001164 self.assertEqual(bufio.readinto(b), 2)
1165 self.assertEqual(b, b"ab")
1166 self.assertEqual(bufio.readinto(b), 2)
1167 self.assertEqual(b, b"cd")
1168 self.assertEqual(bufio.readinto(b), 2)
1169 self.assertEqual(b, b"ef")
1170 self.assertEqual(bufio.readinto(b), 1)
1171 self.assertEqual(b, b"gf")
1172 self.assertEqual(bufio.readinto(b), 0)
1173 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001174 rawio = self.MockRawIO((b"abc", None))
1175 bufio = self.tp(rawio)
1176 self.assertEqual(bufio.readinto(b), 2)
1177 self.assertEqual(b, b"ab")
1178 self.assertEqual(bufio.readinto(b), 1)
1179 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001180
Benjamin Petersona96fea02014-06-22 14:17:44 -07001181 def test_readinto1(self):
1182 buffer_size = 10
1183 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1184 bufio = self.tp(rawio, buffer_size=buffer_size)
1185 b = bytearray(2)
1186 self.assertEqual(bufio.peek(3), b'abc')
1187 self.assertEqual(rawio._reads, 1)
1188 self.assertEqual(bufio.readinto1(b), 2)
1189 self.assertEqual(b, b"ab")
1190 self.assertEqual(rawio._reads, 1)
1191 self.assertEqual(bufio.readinto1(b), 1)
1192 self.assertEqual(b[:1], b"c")
1193 self.assertEqual(rawio._reads, 1)
1194 self.assertEqual(bufio.readinto1(b), 2)
1195 self.assertEqual(b, b"de")
1196 self.assertEqual(rawio._reads, 2)
1197 b = bytearray(2*buffer_size)
1198 self.assertEqual(bufio.peek(3), b'fgh')
1199 self.assertEqual(rawio._reads, 3)
1200 self.assertEqual(bufio.readinto1(b), 6)
1201 self.assertEqual(b[:6], b"fghjkl")
1202 self.assertEqual(rawio._reads, 4)
1203
1204 def test_readinto_array(self):
1205 buffer_size = 60
1206 data = b"a" * 26
1207 rawio = self.MockRawIO((data,))
1208 bufio = self.tp(rawio, buffer_size=buffer_size)
1209
1210 # Create an array with element size > 1 byte
1211 b = array.array('i', b'x' * 32)
1212 assert len(b) != 16
1213
1214 # Read into it. We should get as many *bytes* as we can fit into b
1215 # (which is more than the number of elements)
1216 n = bufio.readinto(b)
1217 self.assertGreater(n, len(b))
1218
1219 # Check that old contents of b are preserved
1220 bm = memoryview(b).cast('B')
1221 self.assertLess(n, len(bm))
1222 self.assertEqual(bm[:n], data[:n])
1223 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1224
1225 def test_readinto1_array(self):
1226 buffer_size = 60
1227 data = b"a" * 26
1228 rawio = self.MockRawIO((data,))
1229 bufio = self.tp(rawio, buffer_size=buffer_size)
1230
1231 # Create an array with element size > 1 byte
1232 b = array.array('i', b'x' * 32)
1233 assert len(b) != 16
1234
1235 # Read into it. We should get as many *bytes* as we can fit into b
1236 # (which is more than the number of elements)
1237 n = bufio.readinto1(b)
1238 self.assertGreater(n, len(b))
1239
1240 # Check that old contents of b are preserved
1241 bm = memoryview(b).cast('B')
1242 self.assertLess(n, len(bm))
1243 self.assertEqual(bm[:n], data[:n])
1244 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1245
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001246 def test_readlines(self):
1247 def bufio():
1248 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1249 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001250 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1251 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1252 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001253
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001254 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001255 data = b"abcdefghi"
1256 dlen = len(data)
1257
1258 tests = [
1259 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1260 [ 100, [ 3, 3, 3], [ dlen ] ],
1261 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1262 ]
1263
1264 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001265 rawio = self.MockFileIO(data)
1266 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001267 pos = 0
1268 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001269 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001270 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001271 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001272 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001273
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001274 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001275 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001276 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1277 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001278 self.assertEqual(b"abcd", bufio.read(6))
1279 self.assertEqual(b"e", bufio.read(1))
1280 self.assertEqual(b"fg", bufio.read())
1281 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001282 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001283 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001284
Victor Stinnera80987f2011-05-25 22:47:16 +02001285 rawio = self.MockRawIO((b"a", None, None))
1286 self.assertEqual(b"a", rawio.readall())
1287 self.assertIsNone(rawio.readall())
1288
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001289 def test_read_past_eof(self):
1290 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1291 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001292
Ezio Melottib3aedd42010-11-20 19:04:17 +00001293 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001294
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001295 def test_read_all(self):
1296 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1297 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001298
Ezio Melottib3aedd42010-11-20 19:04:17 +00001299 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001300
Victor Stinner45df8202010-04-28 22:31:17 +00001301 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001302 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001303 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001304 try:
1305 # Write out many bytes with exactly the same number of 0's,
1306 # 1's... 255's. This will help us check that concurrent reading
1307 # doesn't duplicate or forget contents.
1308 N = 1000
1309 l = list(range(256)) * N
1310 random.shuffle(l)
1311 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001312 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001313 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001314 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001315 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001316 errors = []
1317 results = []
1318 def f():
1319 try:
1320 # Intra-buffer read then buffer-flushing read
1321 for n in cycle([1, 19]):
1322 s = bufio.read(n)
1323 if not s:
1324 break
1325 # list.append() is atomic
1326 results.append(s)
1327 except Exception as e:
1328 errors.append(e)
1329 raise
1330 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001331 with support.start_threads(threads):
1332 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001333 self.assertFalse(errors,
1334 "the following exceptions were caught: %r" % errors)
1335 s = b''.join(results)
1336 for i in range(256):
1337 c = bytes(bytearray([i]))
1338 self.assertEqual(s.count(c), N)
1339 finally:
1340 support.unlink(support.TESTFN)
1341
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001342 def test_unseekable(self):
1343 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1344 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1345 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1346 bufio.read(1)
1347 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1348 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1349
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001350 def test_misbehaved_io(self):
1351 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1352 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001353 self.assertRaises(OSError, bufio.seek, 0)
1354 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001355
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001356 def test_no_extraneous_read(self):
1357 # Issue #9550; when the raw IO object has satisfied the read request,
1358 # we should not issue any additional reads, otherwise it may block
1359 # (e.g. socket).
1360 bufsize = 16
1361 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1362 rawio = self.MockRawIO([b"x" * n])
1363 bufio = self.tp(rawio, bufsize)
1364 self.assertEqual(bufio.read(n), b"x" * n)
1365 # Simple case: one raw read is enough to satisfy the request.
1366 self.assertEqual(rawio._extraneous_reads, 0,
1367 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1368 # A more complex case where two raw reads are needed to satisfy
1369 # the request.
1370 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1371 bufio = self.tp(rawio, bufsize)
1372 self.assertEqual(bufio.read(n), b"x" * n)
1373 self.assertEqual(rawio._extraneous_reads, 0,
1374 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1375
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001376 def test_read_on_closed(self):
1377 # Issue #23796
1378 b = io.BufferedReader(io.BytesIO(b"12"))
1379 b.read(1)
1380 b.close()
1381 self.assertRaises(ValueError, b.peek)
1382 self.assertRaises(ValueError, b.read1, 1)
1383
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001384
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001385class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001386 tp = io.BufferedReader
1387
1388 def test_constructor(self):
1389 BufferedReaderTest.test_constructor(self)
1390 # The allocation can succeed on 32-bit builds, e.g. with more
1391 # than 2GB RAM and a 64-bit kernel.
1392 if sys.maxsize > 0x7FFFFFFF:
1393 rawio = self.MockRawIO()
1394 bufio = self.tp(rawio)
1395 self.assertRaises((OverflowError, MemoryError, ValueError),
1396 bufio.__init__, rawio, sys.maxsize)
1397
1398 def test_initialization(self):
1399 rawio = self.MockRawIO([b"abc"])
1400 bufio = self.tp(rawio)
1401 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1402 self.assertRaises(ValueError, bufio.read)
1403 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1404 self.assertRaises(ValueError, bufio.read)
1405 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1406 self.assertRaises(ValueError, bufio.read)
1407
1408 def test_misbehaved_io_read(self):
1409 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1410 bufio = self.tp(rawio)
1411 # _pyio.BufferedReader seems to implement reading different, so that
1412 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001413 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001414
1415 def test_garbage_collection(self):
1416 # C BufferedReader objects are collected.
1417 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001418 with support.check_warnings(('', ResourceWarning)):
1419 rawio = self.FileIO(support.TESTFN, "w+b")
1420 f = self.tp(rawio)
1421 f.f = f
1422 wr = weakref.ref(f)
1423 del f
1424 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001425 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001426
R David Murray67bfe802013-02-23 21:51:05 -05001427 def test_args_error(self):
1428 # Issue #17275
1429 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1430 self.tp(io.BytesIO(), 1024, 1024, 1024)
1431
1432
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001433class PyBufferedReaderTest(BufferedReaderTest):
1434 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001435
Guido van Rossuma9e20242007-03-08 00:43:48 +00001436
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001437class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1438 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001439
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001440 def test_constructor(self):
1441 rawio = self.MockRawIO()
1442 bufio = self.tp(rawio)
1443 bufio.__init__(rawio)
1444 bufio.__init__(rawio, buffer_size=1024)
1445 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001446 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001447 bufio.flush()
1448 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1449 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1450 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1451 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001452 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001453 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001454 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001455
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001456 def test_uninitialized(self):
1457 bufio = self.tp.__new__(self.tp)
1458 del bufio
1459 bufio = self.tp.__new__(self.tp)
1460 self.assertRaisesRegex((ValueError, AttributeError),
1461 'uninitialized|has no attribute',
1462 bufio.write, b'')
1463 bufio.__init__(self.MockRawIO())
1464 self.assertEqual(bufio.write(b''), 0)
1465
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001466 def test_detach_flush(self):
1467 raw = self.MockRawIO()
1468 buf = self.tp(raw)
1469 buf.write(b"howdy!")
1470 self.assertFalse(raw._write_stack)
1471 buf.detach()
1472 self.assertEqual(raw._write_stack, [b"howdy!"])
1473
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001474 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001475 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001476 writer = self.MockRawIO()
1477 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001478 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001479 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001480 buffer = bytearray(b"def")
1481 bufio.write(buffer)
1482 buffer[:] = b"***" # Overwrite our copy of the data
1483 bufio.flush()
1484 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001485
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001486 def test_write_overflow(self):
1487 writer = self.MockRawIO()
1488 bufio = self.tp(writer, 8)
1489 contents = b"abcdefghijklmnop"
1490 for n in range(0, len(contents), 3):
1491 bufio.write(contents[n:n+3])
1492 flushed = b"".join(writer._write_stack)
1493 # At least (total - 8) bytes were implicitly flushed, perhaps more
1494 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001495 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001496
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001497 def check_writes(self, intermediate_func):
1498 # Lots of writes, test the flushed output is as expected.
1499 contents = bytes(range(256)) * 1000
1500 n = 0
1501 writer = self.MockRawIO()
1502 bufio = self.tp(writer, 13)
1503 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1504 def gen_sizes():
1505 for size in count(1):
1506 for i in range(15):
1507 yield size
1508 sizes = gen_sizes()
1509 while n < len(contents):
1510 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001511 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001512 intermediate_func(bufio)
1513 n += size
1514 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001515 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001516
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001517 def test_writes(self):
1518 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001519
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001520 def test_writes_and_flushes(self):
1521 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001522
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001523 def test_writes_and_seeks(self):
1524 def _seekabs(bufio):
1525 pos = bufio.tell()
1526 bufio.seek(pos + 1, 0)
1527 bufio.seek(pos - 1, 0)
1528 bufio.seek(pos, 0)
1529 self.check_writes(_seekabs)
1530 def _seekrel(bufio):
1531 pos = bufio.seek(0, 1)
1532 bufio.seek(+1, 1)
1533 bufio.seek(-1, 1)
1534 bufio.seek(pos, 0)
1535 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001536
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001537 def test_writes_and_truncates(self):
1538 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001539
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001540 def test_write_non_blocking(self):
1541 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001542 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001543
Ezio Melottib3aedd42010-11-20 19:04:17 +00001544 self.assertEqual(bufio.write(b"abcd"), 4)
1545 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001546 # 1 byte will be written, the rest will be buffered
1547 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001548 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001549
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001550 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1551 raw.block_on(b"0")
1552 try:
1553 bufio.write(b"opqrwxyz0123456789")
1554 except self.BlockingIOError as e:
1555 written = e.characters_written
1556 else:
1557 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001558 self.assertEqual(written, 16)
1559 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001560 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001561
Ezio Melottib3aedd42010-11-20 19:04:17 +00001562 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001563 s = raw.pop_written()
1564 # Previously buffered bytes were flushed
1565 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001566
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001567 def test_write_and_rewind(self):
1568 raw = io.BytesIO()
1569 bufio = self.tp(raw, 4)
1570 self.assertEqual(bufio.write(b"abcdef"), 6)
1571 self.assertEqual(bufio.tell(), 6)
1572 bufio.seek(0, 0)
1573 self.assertEqual(bufio.write(b"XY"), 2)
1574 bufio.seek(6, 0)
1575 self.assertEqual(raw.getvalue(), b"XYcdef")
1576 self.assertEqual(bufio.write(b"123456"), 6)
1577 bufio.flush()
1578 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001579
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001580 def test_flush(self):
1581 writer = self.MockRawIO()
1582 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001583 bufio.write(b"abc")
1584 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001585 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001586
Antoine Pitrou131a4892012-10-16 22:57:11 +02001587 def test_writelines(self):
1588 l = [b'ab', b'cd', b'ef']
1589 writer = self.MockRawIO()
1590 bufio = self.tp(writer, 8)
1591 bufio.writelines(l)
1592 bufio.flush()
1593 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1594
1595 def test_writelines_userlist(self):
1596 l = UserList([b'ab', b'cd', b'ef'])
1597 writer = self.MockRawIO()
1598 bufio = self.tp(writer, 8)
1599 bufio.writelines(l)
1600 bufio.flush()
1601 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1602
1603 def test_writelines_error(self):
1604 writer = self.MockRawIO()
1605 bufio = self.tp(writer, 8)
1606 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1607 self.assertRaises(TypeError, bufio.writelines, None)
1608 self.assertRaises(TypeError, bufio.writelines, 'abc')
1609
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001610 def test_destructor(self):
1611 writer = self.MockRawIO()
1612 bufio = self.tp(writer, 8)
1613 bufio.write(b"abc")
1614 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001615 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001616 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001617
1618 def test_truncate(self):
1619 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001620 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001621 bufio = self.tp(raw, 8)
1622 bufio.write(b"abcdef")
1623 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001624 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001625 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001626 self.assertEqual(f.read(), b"abc")
1627
Victor Stinner45df8202010-04-28 22:31:17 +00001628 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001629 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001630 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001631 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001632 # Write out many bytes from many threads and test they were
1633 # all flushed.
1634 N = 1000
1635 contents = bytes(range(256)) * N
1636 sizes = cycle([1, 19])
1637 n = 0
1638 queue = deque()
1639 while n < len(contents):
1640 size = next(sizes)
1641 queue.append(contents[n:n+size])
1642 n += size
1643 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001644 # We use a real file object because it allows us to
1645 # exercise situations where the GIL is released before
1646 # writing the buffer to the raw streams. This is in addition
1647 # to concurrency issues due to switching threads in the middle
1648 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001649 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001650 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001651 errors = []
1652 def f():
1653 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001654 while True:
1655 try:
1656 s = queue.popleft()
1657 except IndexError:
1658 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001659 bufio.write(s)
1660 except Exception as e:
1661 errors.append(e)
1662 raise
1663 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001664 with support.start_threads(threads):
1665 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001666 self.assertFalse(errors,
1667 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001668 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001669 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001670 s = f.read()
1671 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001672 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001673 finally:
1674 support.unlink(support.TESTFN)
1675
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001676 def test_misbehaved_io(self):
1677 rawio = self.MisbehavedRawIO()
1678 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001679 self.assertRaises(OSError, bufio.seek, 0)
1680 self.assertRaises(OSError, bufio.tell)
1681 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001682
Florent Xicluna109d5732012-07-07 17:03:22 +02001683 def test_max_buffer_size_removal(self):
1684 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001685 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001686
Benjamin Peterson68623612012-12-20 11:53:11 -06001687 def test_write_error_on_close(self):
1688 raw = self.MockRawIO()
1689 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001690 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001691 raw.write = bad_write
1692 b = self.tp(raw)
1693 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001694 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001695 self.assertTrue(b.closed)
1696
Benjamin Peterson59406a92009-03-26 17:10:29 +00001697
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001698class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001699 tp = io.BufferedWriter
1700
1701 def test_constructor(self):
1702 BufferedWriterTest.test_constructor(self)
1703 # The allocation can succeed on 32-bit builds, e.g. with more
1704 # than 2GB RAM and a 64-bit kernel.
1705 if sys.maxsize > 0x7FFFFFFF:
1706 rawio = self.MockRawIO()
1707 bufio = self.tp(rawio)
1708 self.assertRaises((OverflowError, MemoryError, ValueError),
1709 bufio.__init__, rawio, sys.maxsize)
1710
1711 def test_initialization(self):
1712 rawio = self.MockRawIO()
1713 bufio = self.tp(rawio)
1714 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1715 self.assertRaises(ValueError, bufio.write, b"def")
1716 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1717 self.assertRaises(ValueError, bufio.write, b"def")
1718 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1719 self.assertRaises(ValueError, bufio.write, b"def")
1720
1721 def test_garbage_collection(self):
1722 # C BufferedWriter objects are collected, and collecting them flushes
1723 # all data to disk.
1724 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001725 with support.check_warnings(('', ResourceWarning)):
1726 rawio = self.FileIO(support.TESTFN, "w+b")
1727 f = self.tp(rawio)
1728 f.write(b"123xxx")
1729 f.x = f
1730 wr = weakref.ref(f)
1731 del f
1732 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001733 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001734 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001735 self.assertEqual(f.read(), b"123xxx")
1736
R David Murray67bfe802013-02-23 21:51:05 -05001737 def test_args_error(self):
1738 # Issue #17275
1739 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1740 self.tp(io.BytesIO(), 1024, 1024, 1024)
1741
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001742
1743class PyBufferedWriterTest(BufferedWriterTest):
1744 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001745
Guido van Rossum01a27522007-03-07 01:00:12 +00001746class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001747
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001748 def test_constructor(self):
1749 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001750 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001751
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001752 def test_uninitialized(self):
1753 pair = self.tp.__new__(self.tp)
1754 del pair
1755 pair = self.tp.__new__(self.tp)
1756 self.assertRaisesRegex((ValueError, AttributeError),
1757 'uninitialized|has no attribute',
1758 pair.read, 0)
1759 self.assertRaisesRegex((ValueError, AttributeError),
1760 'uninitialized|has no attribute',
1761 pair.write, b'')
1762 pair.__init__(self.MockRawIO(), self.MockRawIO())
1763 self.assertEqual(pair.read(0), b'')
1764 self.assertEqual(pair.write(b''), 0)
1765
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001766 def test_detach(self):
1767 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1768 self.assertRaises(self.UnsupportedOperation, pair.detach)
1769
Florent Xicluna109d5732012-07-07 17:03:22 +02001770 def test_constructor_max_buffer_size_removal(self):
1771 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001772 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001773
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001774 def test_constructor_with_not_readable(self):
1775 class NotReadable(MockRawIO):
1776 def readable(self):
1777 return False
1778
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001779 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001780
1781 def test_constructor_with_not_writeable(self):
1782 class NotWriteable(MockRawIO):
1783 def writable(self):
1784 return False
1785
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001786 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001787
1788 def test_read(self):
1789 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1790
1791 self.assertEqual(pair.read(3), b"abc")
1792 self.assertEqual(pair.read(1), b"d")
1793 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001794 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1795 self.assertEqual(pair.read(None), b"abc")
1796
1797 def test_readlines(self):
1798 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1799 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1800 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1801 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001802
1803 def test_read1(self):
1804 # .read1() is delegated to the underlying reader object, so this test
1805 # can be shallow.
1806 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1807
1808 self.assertEqual(pair.read1(3), b"abc")
1809
1810 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00001811 for method in ("readinto", "readinto1"):
1812 with self.subTest(method):
1813 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001814
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03001815 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00001816 self.assertEqual(getattr(pair, method)(data), 5)
1817 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001818
1819 def test_write(self):
1820 w = self.MockRawIO()
1821 pair = self.tp(self.MockRawIO(), w)
1822
1823 pair.write(b"abc")
1824 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00001825 buffer = bytearray(b"def")
1826 pair.write(buffer)
1827 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001828 pair.flush()
1829 self.assertEqual(w._write_stack, [b"abc", b"def"])
1830
1831 def test_peek(self):
1832 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1833
1834 self.assertTrue(pair.peek(3).startswith(b"abc"))
1835 self.assertEqual(pair.read(3), b"abc")
1836
1837 def test_readable(self):
1838 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1839 self.assertTrue(pair.readable())
1840
1841 def test_writeable(self):
1842 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1843 self.assertTrue(pair.writable())
1844
1845 def test_seekable(self):
1846 # BufferedRWPairs are never seekable, even if their readers and writers
1847 # are.
1848 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1849 self.assertFalse(pair.seekable())
1850
1851 # .flush() is delegated to the underlying writer object and has been
1852 # tested in the test_write method.
1853
1854 def test_close_and_closed(self):
1855 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1856 self.assertFalse(pair.closed)
1857 pair.close()
1858 self.assertTrue(pair.closed)
1859
Serhiy Storchaka7665be62015-03-24 23:21:57 +02001860 def test_reader_close_error_on_close(self):
1861 def reader_close():
1862 reader_non_existing
1863 reader = self.MockRawIO()
1864 reader.close = reader_close
1865 writer = self.MockRawIO()
1866 pair = self.tp(reader, writer)
1867 with self.assertRaises(NameError) as err:
1868 pair.close()
1869 self.assertIn('reader_non_existing', str(err.exception))
1870 self.assertTrue(pair.closed)
1871 self.assertFalse(reader.closed)
1872 self.assertTrue(writer.closed)
1873
1874 def test_writer_close_error_on_close(self):
1875 def writer_close():
1876 writer_non_existing
1877 reader = self.MockRawIO()
1878 writer = self.MockRawIO()
1879 writer.close = writer_close
1880 pair = self.tp(reader, writer)
1881 with self.assertRaises(NameError) as err:
1882 pair.close()
1883 self.assertIn('writer_non_existing', str(err.exception))
1884 self.assertFalse(pair.closed)
1885 self.assertTrue(reader.closed)
1886 self.assertFalse(writer.closed)
1887
1888 def test_reader_writer_close_error_on_close(self):
1889 def reader_close():
1890 reader_non_existing
1891 def writer_close():
1892 writer_non_existing
1893 reader = self.MockRawIO()
1894 reader.close = reader_close
1895 writer = self.MockRawIO()
1896 writer.close = writer_close
1897 pair = self.tp(reader, writer)
1898 with self.assertRaises(NameError) as err:
1899 pair.close()
1900 self.assertIn('reader_non_existing', str(err.exception))
1901 self.assertIsInstance(err.exception.__context__, NameError)
1902 self.assertIn('writer_non_existing', str(err.exception.__context__))
1903 self.assertFalse(pair.closed)
1904 self.assertFalse(reader.closed)
1905 self.assertFalse(writer.closed)
1906
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001907 def test_isatty(self):
1908 class SelectableIsAtty(MockRawIO):
1909 def __init__(self, isatty):
1910 MockRawIO.__init__(self)
1911 self._isatty = isatty
1912
1913 def isatty(self):
1914 return self._isatty
1915
1916 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1917 self.assertFalse(pair.isatty())
1918
1919 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1920 self.assertTrue(pair.isatty())
1921
1922 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1923 self.assertTrue(pair.isatty())
1924
1925 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1926 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001927
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04001928 def test_weakref_clearing(self):
1929 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1930 ref = weakref.ref(brw)
1931 brw = None
1932 ref = None # Shouldn't segfault.
1933
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001934class CBufferedRWPairTest(BufferedRWPairTest):
1935 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001936
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001937class PyBufferedRWPairTest(BufferedRWPairTest):
1938 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001939
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001940
1941class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1942 read_mode = "rb+"
1943 write_mode = "wb+"
1944
1945 def test_constructor(self):
1946 BufferedReaderTest.test_constructor(self)
1947 BufferedWriterTest.test_constructor(self)
1948
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001949 def test_uninitialized(self):
1950 BufferedReaderTest.test_uninitialized(self)
1951 BufferedWriterTest.test_uninitialized(self)
1952
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001953 def test_read_and_write(self):
1954 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001955 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001956
1957 self.assertEqual(b"as", rw.read(2))
1958 rw.write(b"ddd")
1959 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001960 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001961 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001962 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001963
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001964 def test_seek_and_tell(self):
1965 raw = self.BytesIO(b"asdfghjkl")
1966 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001967
Ezio Melottib3aedd42010-11-20 19:04:17 +00001968 self.assertEqual(b"as", rw.read(2))
1969 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001970 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001971 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001972
Antoine Pitroue05565e2011-08-20 14:39:23 +02001973 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001974 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001975 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001976 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001977 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001978 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001979 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001980 self.assertEqual(7, rw.tell())
1981 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001982 rw.flush()
1983 self.assertEqual(b"asdf123fl", raw.getvalue())
1984
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001985 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001986
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001987 def check_flush_and_read(self, read_func):
1988 raw = self.BytesIO(b"abcdefghi")
1989 bufio = self.tp(raw)
1990
Ezio Melottib3aedd42010-11-20 19:04:17 +00001991 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001992 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001993 self.assertEqual(b"ef", read_func(bufio, 2))
1994 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001995 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001996 self.assertEqual(6, bufio.tell())
1997 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001998 raw.seek(0, 0)
1999 raw.write(b"XYZ")
2000 # flush() resets the read buffer
2001 bufio.flush()
2002 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002003 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002004
2005 def test_flush_and_read(self):
2006 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2007
2008 def test_flush_and_readinto(self):
2009 def _readinto(bufio, n=-1):
2010 b = bytearray(n if n >= 0 else 9999)
2011 n = bufio.readinto(b)
2012 return bytes(b[:n])
2013 self.check_flush_and_read(_readinto)
2014
2015 def test_flush_and_peek(self):
2016 def _peek(bufio, n=-1):
2017 # This relies on the fact that the buffer can contain the whole
2018 # raw stream, otherwise peek() can return less.
2019 b = bufio.peek(n)
2020 if n != -1:
2021 b = b[:n]
2022 bufio.seek(len(b), 1)
2023 return b
2024 self.check_flush_and_read(_peek)
2025
2026 def test_flush_and_write(self):
2027 raw = self.BytesIO(b"abcdefghi")
2028 bufio = self.tp(raw)
2029
2030 bufio.write(b"123")
2031 bufio.flush()
2032 bufio.write(b"45")
2033 bufio.flush()
2034 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002035 self.assertEqual(b"12345fghi", raw.getvalue())
2036 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002037
2038 def test_threads(self):
2039 BufferedReaderTest.test_threads(self)
2040 BufferedWriterTest.test_threads(self)
2041
2042 def test_writes_and_peek(self):
2043 def _peek(bufio):
2044 bufio.peek(1)
2045 self.check_writes(_peek)
2046 def _peek(bufio):
2047 pos = bufio.tell()
2048 bufio.seek(-1, 1)
2049 bufio.peek(1)
2050 bufio.seek(pos, 0)
2051 self.check_writes(_peek)
2052
2053 def test_writes_and_reads(self):
2054 def _read(bufio):
2055 bufio.seek(-1, 1)
2056 bufio.read(1)
2057 self.check_writes(_read)
2058
2059 def test_writes_and_read1s(self):
2060 def _read1(bufio):
2061 bufio.seek(-1, 1)
2062 bufio.read1(1)
2063 self.check_writes(_read1)
2064
2065 def test_writes_and_readintos(self):
2066 def _read(bufio):
2067 bufio.seek(-1, 1)
2068 bufio.readinto(bytearray(1))
2069 self.check_writes(_read)
2070
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002071 def test_write_after_readahead(self):
2072 # Issue #6629: writing after the buffer was filled by readahead should
2073 # first rewind the raw stream.
2074 for overwrite_size in [1, 5]:
2075 raw = self.BytesIO(b"A" * 10)
2076 bufio = self.tp(raw, 4)
2077 # Trigger readahead
2078 self.assertEqual(bufio.read(1), b"A")
2079 self.assertEqual(bufio.tell(), 1)
2080 # Overwriting should rewind the raw stream if it needs so
2081 bufio.write(b"B" * overwrite_size)
2082 self.assertEqual(bufio.tell(), overwrite_size + 1)
2083 # If the write size was smaller than the buffer size, flush() and
2084 # check that rewind happens.
2085 bufio.flush()
2086 self.assertEqual(bufio.tell(), overwrite_size + 1)
2087 s = raw.getvalue()
2088 self.assertEqual(s,
2089 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2090
Antoine Pitrou7c404892011-05-13 00:13:33 +02002091 def test_write_rewind_write(self):
2092 # Various combinations of reading / writing / seeking backwards / writing again
2093 def mutate(bufio, pos1, pos2):
2094 assert pos2 >= pos1
2095 # Fill the buffer
2096 bufio.seek(pos1)
2097 bufio.read(pos2 - pos1)
2098 bufio.write(b'\x02')
2099 # This writes earlier than the previous write, but still inside
2100 # the buffer.
2101 bufio.seek(pos1)
2102 bufio.write(b'\x01')
2103
2104 b = b"\x80\x81\x82\x83\x84"
2105 for i in range(0, len(b)):
2106 for j in range(i, len(b)):
2107 raw = self.BytesIO(b)
2108 bufio = self.tp(raw, 100)
2109 mutate(bufio, i, j)
2110 bufio.flush()
2111 expected = bytearray(b)
2112 expected[j] = 2
2113 expected[i] = 1
2114 self.assertEqual(raw.getvalue(), expected,
2115 "failed result for i=%d, j=%d" % (i, j))
2116
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002117 def test_truncate_after_read_or_write(self):
2118 raw = self.BytesIO(b"A" * 10)
2119 bufio = self.tp(raw, 100)
2120 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2121 self.assertEqual(bufio.truncate(), 2)
2122 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2123 self.assertEqual(bufio.truncate(), 4)
2124
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002125 def test_misbehaved_io(self):
2126 BufferedReaderTest.test_misbehaved_io(self)
2127 BufferedWriterTest.test_misbehaved_io(self)
2128
Antoine Pitroue05565e2011-08-20 14:39:23 +02002129 def test_interleaved_read_write(self):
2130 # Test for issue #12213
2131 with self.BytesIO(b'abcdefgh') as raw:
2132 with self.tp(raw, 100) as f:
2133 f.write(b"1")
2134 self.assertEqual(f.read(1), b'b')
2135 f.write(b'2')
2136 self.assertEqual(f.read1(1), b'd')
2137 f.write(b'3')
2138 buf = bytearray(1)
2139 f.readinto(buf)
2140 self.assertEqual(buf, b'f')
2141 f.write(b'4')
2142 self.assertEqual(f.peek(1), b'h')
2143 f.flush()
2144 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2145
2146 with self.BytesIO(b'abc') as raw:
2147 with self.tp(raw, 100) as f:
2148 self.assertEqual(f.read(1), b'a')
2149 f.write(b"2")
2150 self.assertEqual(f.read(1), b'c')
2151 f.flush()
2152 self.assertEqual(raw.getvalue(), b'a2c')
2153
2154 def test_interleaved_readline_write(self):
2155 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2156 with self.tp(raw) as f:
2157 f.write(b'1')
2158 self.assertEqual(f.readline(), b'b\n')
2159 f.write(b'2')
2160 self.assertEqual(f.readline(), b'def\n')
2161 f.write(b'3')
2162 self.assertEqual(f.readline(), b'\n')
2163 f.flush()
2164 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2165
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002166 # You can't construct a BufferedRandom over a non-seekable stream.
2167 test_unseekable = None
2168
R David Murray67bfe802013-02-23 21:51:05 -05002169
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002170class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002171 tp = io.BufferedRandom
2172
2173 def test_constructor(self):
2174 BufferedRandomTest.test_constructor(self)
2175 # The allocation can succeed on 32-bit builds, e.g. with more
2176 # than 2GB RAM and a 64-bit kernel.
2177 if sys.maxsize > 0x7FFFFFFF:
2178 rawio = self.MockRawIO()
2179 bufio = self.tp(rawio)
2180 self.assertRaises((OverflowError, MemoryError, ValueError),
2181 bufio.__init__, rawio, sys.maxsize)
2182
2183 def test_garbage_collection(self):
2184 CBufferedReaderTest.test_garbage_collection(self)
2185 CBufferedWriterTest.test_garbage_collection(self)
2186
R David Murray67bfe802013-02-23 21:51:05 -05002187 def test_args_error(self):
2188 # Issue #17275
2189 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2190 self.tp(io.BytesIO(), 1024, 1024, 1024)
2191
2192
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002193class PyBufferedRandomTest(BufferedRandomTest):
2194 tp = pyio.BufferedRandom
2195
2196
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002197# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2198# properties:
2199# - A single output character can correspond to many bytes of input.
2200# - The number of input bytes to complete the character can be
2201# undetermined until the last input byte is received.
2202# - The number of input bytes can vary depending on previous input.
2203# - A single input byte can correspond to many characters of output.
2204# - The number of output characters can be undetermined until the
2205# last input byte is received.
2206# - The number of output characters can vary depending on previous input.
2207
2208class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2209 """
2210 For testing seek/tell behavior with a stateful, buffering decoder.
2211
2212 Input is a sequence of words. Words may be fixed-length (length set
2213 by input) or variable-length (period-terminated). In variable-length
2214 mode, extra periods are ignored. Possible words are:
2215 - 'i' followed by a number sets the input length, I (maximum 99).
2216 When I is set to 0, words are space-terminated.
2217 - 'o' followed by a number sets the output length, O (maximum 99).
2218 - Any other word is converted into a word followed by a period on
2219 the output. The output word consists of the input word truncated
2220 or padded out with hyphens to make its length equal to O. If O
2221 is 0, the word is output verbatim without truncating or padding.
2222 I and O are initially set to 1. When I changes, any buffered input is
2223 re-scanned according to the new I. EOF also terminates the last word.
2224 """
2225
2226 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002227 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002228 self.reset()
2229
2230 def __repr__(self):
2231 return '<SID %x>' % id(self)
2232
2233 def reset(self):
2234 self.i = 1
2235 self.o = 1
2236 self.buffer = bytearray()
2237
2238 def getstate(self):
2239 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2240 return bytes(self.buffer), i*100 + o
2241
2242 def setstate(self, state):
2243 buffer, io = state
2244 self.buffer = bytearray(buffer)
2245 i, o = divmod(io, 100)
2246 self.i, self.o = i ^ 1, o ^ 1
2247
2248 def decode(self, input, final=False):
2249 output = ''
2250 for b in input:
2251 if self.i == 0: # variable-length, terminated with period
2252 if b == ord('.'):
2253 if self.buffer:
2254 output += self.process_word()
2255 else:
2256 self.buffer.append(b)
2257 else: # fixed-length, terminate after self.i bytes
2258 self.buffer.append(b)
2259 if len(self.buffer) == self.i:
2260 output += self.process_word()
2261 if final and self.buffer: # EOF terminates the last word
2262 output += self.process_word()
2263 return output
2264
2265 def process_word(self):
2266 output = ''
2267 if self.buffer[0] == ord('i'):
2268 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2269 elif self.buffer[0] == ord('o'):
2270 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2271 else:
2272 output = self.buffer.decode('ascii')
2273 if len(output) < self.o:
2274 output += '-'*self.o # pad out with hyphens
2275 if self.o:
2276 output = output[:self.o] # truncate to output length
2277 output += '.'
2278 self.buffer = bytearray()
2279 return output
2280
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002281 codecEnabled = False
2282
2283 @classmethod
2284 def lookupTestDecoder(cls, name):
2285 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002286 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002287 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002288 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002289 incrementalencoder=None,
2290 streamreader=None, streamwriter=None,
2291 incrementaldecoder=cls)
2292
2293# Register the previous decoder for testing.
2294# Disabled by default, tests will enable it.
2295codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2296
2297
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002298class StatefulIncrementalDecoderTest(unittest.TestCase):
2299 """
2300 Make sure the StatefulIncrementalDecoder actually works.
2301 """
2302
2303 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002304 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002305 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002306 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002307 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002308 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002309 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002310 # I=0, O=6 (variable-length input, fixed-length output)
2311 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2312 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002313 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002314 # I=6, O=3 (fixed-length input > fixed-length output)
2315 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2316 # I=0, then 3; O=29, then 15 (with longer output)
2317 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2318 'a----------------------------.' +
2319 'b----------------------------.' +
2320 'cde--------------------------.' +
2321 'abcdefghijabcde.' +
2322 'a.b------------.' +
2323 '.c.------------.' +
2324 'd.e------------.' +
2325 'k--------------.' +
2326 'l--------------.' +
2327 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002328 ]
2329
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002330 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002331 # Try a few one-shot test cases.
2332 for input, eof, output in self.test_cases:
2333 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002334 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002335
2336 # Also test an unfinished decode, followed by forcing EOF.
2337 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002338 self.assertEqual(d.decode(b'oiabcd'), '')
2339 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002340
2341class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002342
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002343 def setUp(self):
2344 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2345 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002346 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002347
Guido van Rossumd0712812007-04-11 16:32:43 +00002348 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002349 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002350
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002351 def test_constructor(self):
2352 r = self.BytesIO(b"\xc3\xa9\n\n")
2353 b = self.BufferedReader(r, 1000)
2354 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002355 t.__init__(b, encoding="latin-1", newline="\r\n")
2356 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002357 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002358 t.__init__(b, encoding="utf-8", line_buffering=True)
2359 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002360 self.assertEqual(t.line_buffering, True)
2361 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002362 self.assertRaises(TypeError, t.__init__, b, newline=42)
2363 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2364
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002365 def test_uninitialized(self):
2366 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2367 del t
2368 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2369 self.assertRaises(Exception, repr, t)
2370 self.assertRaisesRegex((ValueError, AttributeError),
2371 'uninitialized|has no attribute',
2372 t.read, 0)
2373 t.__init__(self.MockRawIO())
2374 self.assertEqual(t.read(0), '')
2375
Nick Coghlana9b15242014-02-04 22:11:18 +10002376 def test_non_text_encoding_codecs_are_rejected(self):
2377 # Ensure the constructor complains if passed a codec that isn't
2378 # marked as a text encoding
2379 # http://bugs.python.org/issue20404
2380 r = self.BytesIO()
2381 b = self.BufferedWriter(r)
2382 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2383 self.TextIOWrapper(b, encoding="hex")
2384
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002385 def test_detach(self):
2386 r = self.BytesIO()
2387 b = self.BufferedWriter(r)
2388 t = self.TextIOWrapper(b)
2389 self.assertIs(t.detach(), b)
2390
2391 t = self.TextIOWrapper(b, encoding="ascii")
2392 t.write("howdy")
2393 self.assertFalse(r.getvalue())
2394 t.detach()
2395 self.assertEqual(r.getvalue(), b"howdy")
2396 self.assertRaises(ValueError, t.detach)
2397
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002398 # Operations independent of the detached stream should still work
2399 repr(t)
2400 self.assertEqual(t.encoding, "ascii")
2401 self.assertEqual(t.errors, "strict")
2402 self.assertFalse(t.line_buffering)
2403
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002404 def test_repr(self):
2405 raw = self.BytesIO("hello".encode("utf-8"))
2406 b = self.BufferedReader(raw)
2407 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002408 modname = self.TextIOWrapper.__module__
2409 self.assertEqual(repr(t),
2410 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2411 raw.name = "dummy"
2412 self.assertEqual(repr(t),
2413 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002414 t.mode = "r"
2415 self.assertEqual(repr(t),
2416 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002417 raw.name = b"dummy"
2418 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002419 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002420
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002421 t.buffer.detach()
2422 repr(t) # Should not raise an exception
2423
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002424 def test_line_buffering(self):
2425 r = self.BytesIO()
2426 b = self.BufferedWriter(r, 1000)
2427 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002428 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002429 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002430 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002431 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002432 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002433 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002434
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002435 def test_default_encoding(self):
2436 old_environ = dict(os.environ)
2437 try:
2438 # try to get a user preferred encoding different than the current
2439 # locale encoding to check that TextIOWrapper() uses the current
2440 # locale encoding and not the user preferred encoding
2441 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2442 if key in os.environ:
2443 del os.environ[key]
2444
2445 current_locale_encoding = locale.getpreferredencoding(False)
2446 b = self.BytesIO()
2447 t = self.TextIOWrapper(b)
2448 self.assertEqual(t.encoding, current_locale_encoding)
2449 finally:
2450 os.environ.clear()
2451 os.environ.update(old_environ)
2452
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002453 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002454 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002455 # Issue 15989
2456 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002457 b = self.BytesIO()
2458 b.fileno = lambda: _testcapi.INT_MAX + 1
2459 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2460 b.fileno = lambda: _testcapi.UINT_MAX + 1
2461 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2462
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002463 def test_encoding(self):
2464 # Check the encoding attribute is always set, and valid
2465 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002466 t = self.TextIOWrapper(b, encoding="utf-8")
2467 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002468 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002469 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002470 codecs.lookup(t.encoding)
2471
2472 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002473 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002474 b = self.BytesIO(b"abc\n\xff\n")
2475 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002476 self.assertRaises(UnicodeError, t.read)
2477 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002478 b = self.BytesIO(b"abc\n\xff\n")
2479 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002480 self.assertRaises(UnicodeError, t.read)
2481 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002482 b = self.BytesIO(b"abc\n\xff\n")
2483 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002484 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002485 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002486 b = self.BytesIO(b"abc\n\xff\n")
2487 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002488 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002489
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002490 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002491 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002492 b = self.BytesIO()
2493 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002494 self.assertRaises(UnicodeError, t.write, "\xff")
2495 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002496 b = self.BytesIO()
2497 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002498 self.assertRaises(UnicodeError, t.write, "\xff")
2499 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002500 b = self.BytesIO()
2501 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002502 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002503 t.write("abc\xffdef\n")
2504 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002505 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002506 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002507 b = self.BytesIO()
2508 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002509 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002510 t.write("abc\xffdef\n")
2511 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002512 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002513
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002514 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002515 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2516
2517 tests = [
2518 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002519 [ '', input_lines ],
2520 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2521 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2522 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002523 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002524 encodings = (
2525 'utf-8', 'latin-1',
2526 'utf-16', 'utf-16-le', 'utf-16-be',
2527 'utf-32', 'utf-32-le', 'utf-32-be',
2528 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002529
Guido van Rossum8358db22007-08-18 21:39:55 +00002530 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002531 # character in TextIOWrapper._pending_line.
2532 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002533 # XXX: str.encode() should return bytes
2534 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002535 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002536 for bufsize in range(1, 10):
2537 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002538 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2539 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002540 encoding=encoding)
2541 if do_reads:
2542 got_lines = []
2543 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002544 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002545 if c2 == '':
2546 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002547 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002548 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002549 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002550 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002551
2552 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002553 self.assertEqual(got_line, exp_line)
2554 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002555
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002556 def test_newlines_input(self):
2557 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002558 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2559 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002560 (None, normalized.decode("ascii").splitlines(keepends=True)),
2561 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002562 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2563 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2564 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002565 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002566 buf = self.BytesIO(testdata)
2567 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002568 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002569 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002570 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002571
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002572 def test_newlines_output(self):
2573 testdict = {
2574 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2575 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2576 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2577 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2578 }
2579 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2580 for newline, expected in tests:
2581 buf = self.BytesIO()
2582 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2583 txt.write("AAA\nB")
2584 txt.write("BB\nCCC\n")
2585 txt.write("X\rY\r\nZ")
2586 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002587 self.assertEqual(buf.closed, False)
2588 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002589
2590 def test_destructor(self):
2591 l = []
2592 base = self.BytesIO
2593 class MyBytesIO(base):
2594 def close(self):
2595 l.append(self.getvalue())
2596 base.close(self)
2597 b = MyBytesIO()
2598 t = self.TextIOWrapper(b, encoding="ascii")
2599 t.write("abc")
2600 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002601 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002602 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002603
2604 def test_override_destructor(self):
2605 record = []
2606 class MyTextIO(self.TextIOWrapper):
2607 def __del__(self):
2608 record.append(1)
2609 try:
2610 f = super().__del__
2611 except AttributeError:
2612 pass
2613 else:
2614 f()
2615 def close(self):
2616 record.append(2)
2617 super().close()
2618 def flush(self):
2619 record.append(3)
2620 super().flush()
2621 b = self.BytesIO()
2622 t = MyTextIO(b, encoding="ascii")
2623 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002624 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002625 self.assertEqual(record, [1, 2, 3])
2626
2627 def test_error_through_destructor(self):
2628 # Test that the exception state is not modified by a destructor,
2629 # even if close() fails.
2630 rawio = self.CloseFailureIO()
2631 def f():
2632 self.TextIOWrapper(rawio).xyzzy
2633 with support.captured_output("stderr") as s:
2634 self.assertRaises(AttributeError, f)
2635 s = s.getvalue().strip()
2636 if s:
2637 # The destructor *may* have printed an unraisable error, check it
2638 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002639 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002640 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002641
Guido van Rossum9b76da62007-04-11 01:09:03 +00002642 # Systematic tests of the text I/O API
2643
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002644 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002645 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002646 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002647 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002648 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002649 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002650 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002651 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002652 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002653 self.assertEqual(f.tell(), 0)
2654 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002655 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002656 self.assertEqual(f.seek(0), 0)
2657 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002658 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002659 self.assertEqual(f.read(2), "ab")
2660 self.assertEqual(f.read(1), "c")
2661 self.assertEqual(f.read(1), "")
2662 self.assertEqual(f.read(), "")
2663 self.assertEqual(f.tell(), cookie)
2664 self.assertEqual(f.seek(0), 0)
2665 self.assertEqual(f.seek(0, 2), cookie)
2666 self.assertEqual(f.write("def"), 3)
2667 self.assertEqual(f.seek(cookie), cookie)
2668 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002669 if enc.startswith("utf"):
2670 self.multi_line_test(f, enc)
2671 f.close()
2672
2673 def multi_line_test(self, f, enc):
2674 f.seek(0)
2675 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002676 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002677 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002678 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002679 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002680 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002681 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002682 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002683 wlines.append((f.tell(), line))
2684 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002685 f.seek(0)
2686 rlines = []
2687 while True:
2688 pos = f.tell()
2689 line = f.readline()
2690 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002691 break
2692 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002693 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002694
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002695 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002696 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002697 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002698 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002699 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002700 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002701 p2 = f.tell()
2702 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002703 self.assertEqual(f.tell(), p0)
2704 self.assertEqual(f.readline(), "\xff\n")
2705 self.assertEqual(f.tell(), p1)
2706 self.assertEqual(f.readline(), "\xff\n")
2707 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002708 f.seek(0)
2709 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002710 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002711 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002712 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002713 f.close()
2714
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002715 def test_seeking(self):
2716 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002717 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002718 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002719 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002720 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002721 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002722 suffix = bytes(u_suffix.encode("utf-8"))
2723 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002724 with self.open(support.TESTFN, "wb") as f:
2725 f.write(line*2)
2726 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2727 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002728 self.assertEqual(s, str(prefix, "ascii"))
2729 self.assertEqual(f.tell(), prefix_size)
2730 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002731
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002732 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002733 # Regression test for a specific bug
2734 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002735 with self.open(support.TESTFN, "wb") as f:
2736 f.write(data)
2737 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2738 f._CHUNK_SIZE # Just test that it exists
2739 f._CHUNK_SIZE = 2
2740 f.readline()
2741 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002742
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002743 def test_seek_and_tell(self):
2744 #Test seek/tell using the StatefulIncrementalDecoder.
2745 # Make test faster by doing smaller seeks
2746 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002747
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002748 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002749 """Tell/seek to various points within a data stream and ensure
2750 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002751 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002752 f.write(data)
2753 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002754 f = self.open(support.TESTFN, encoding='test_decoder')
2755 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002756 decoded = f.read()
2757 f.close()
2758
Neal Norwitze2b07052008-03-18 19:52:05 +00002759 for i in range(min_pos, len(decoded) + 1): # seek positions
2760 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002761 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002762 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002763 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002764 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002765 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002766 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002767 f.close()
2768
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002769 # Enable the test decoder.
2770 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002771
2772 # Run the tests.
2773 try:
2774 # Try each test case.
2775 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002776 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002777
2778 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002779 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2780 offset = CHUNK_SIZE - len(input)//2
2781 prefix = b'.'*offset
2782 # Don't bother seeking into the prefix (takes too long).
2783 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002784 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002785
2786 # Ensure our test decoder won't interfere with subsequent tests.
2787 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002788 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002789
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002790 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002791 data = "1234567890"
2792 tests = ("utf-16",
2793 "utf-16-le",
2794 "utf-16-be",
2795 "utf-32",
2796 "utf-32-le",
2797 "utf-32-be")
2798 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002799 buf = self.BytesIO()
2800 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002801 # Check if the BOM is written only once (see issue1753).
2802 f.write(data)
2803 f.write(data)
2804 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002805 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002806 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002807 self.assertEqual(f.read(), data * 2)
2808 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002809
Benjamin Petersona1b49012009-03-31 23:11:32 +00002810 def test_unreadable(self):
2811 class UnReadable(self.BytesIO):
2812 def readable(self):
2813 return False
2814 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002815 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002816
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002817 def test_read_one_by_one(self):
2818 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002819 reads = ""
2820 while True:
2821 c = txt.read(1)
2822 if not c:
2823 break
2824 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002825 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002826
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002827 def test_readlines(self):
2828 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2829 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2830 txt.seek(0)
2831 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2832 txt.seek(0)
2833 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2834
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002835 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002836 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002837 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002838 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002839 reads = ""
2840 while True:
2841 c = txt.read(128)
2842 if not c:
2843 break
2844 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002845 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002846
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002847 def test_writelines(self):
2848 l = ['ab', 'cd', 'ef']
2849 buf = self.BytesIO()
2850 txt = self.TextIOWrapper(buf)
2851 txt.writelines(l)
2852 txt.flush()
2853 self.assertEqual(buf.getvalue(), b'abcdef')
2854
2855 def test_writelines_userlist(self):
2856 l = UserList(['ab', 'cd', 'ef'])
2857 buf = self.BytesIO()
2858 txt = self.TextIOWrapper(buf)
2859 txt.writelines(l)
2860 txt.flush()
2861 self.assertEqual(buf.getvalue(), b'abcdef')
2862
2863 def test_writelines_error(self):
2864 txt = self.TextIOWrapper(self.BytesIO())
2865 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2866 self.assertRaises(TypeError, txt.writelines, None)
2867 self.assertRaises(TypeError, txt.writelines, b'abc')
2868
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002869 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002870 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002871
2872 # read one char at a time
2873 reads = ""
2874 while True:
2875 c = txt.read(1)
2876 if not c:
2877 break
2878 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002879 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002880
2881 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002882 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002883 txt._CHUNK_SIZE = 4
2884
2885 reads = ""
2886 while True:
2887 c = txt.read(4)
2888 if not c:
2889 break
2890 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002891 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002892
2893 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002894 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002895 txt._CHUNK_SIZE = 4
2896
2897 reads = txt.read(4)
2898 reads += txt.read(4)
2899 reads += txt.readline()
2900 reads += txt.readline()
2901 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002902 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002903
2904 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002905 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002906 txt._CHUNK_SIZE = 4
2907
2908 reads = txt.read(4)
2909 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002910 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002911
2912 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002913 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002914 txt._CHUNK_SIZE = 4
2915
2916 reads = txt.read(4)
2917 pos = txt.tell()
2918 txt.seek(0)
2919 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002920 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002921
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002922 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002923 buffer = self.BytesIO(self.testdata)
2924 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002925
2926 self.assertEqual(buffer.seekable(), txt.seekable())
2927
Antoine Pitroue4501852009-05-14 18:55:55 +00002928 def test_append_bom(self):
2929 # The BOM is not written again when appending to a non-empty file
2930 filename = support.TESTFN
2931 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2932 with self.open(filename, 'w', encoding=charset) as f:
2933 f.write('aaa')
2934 pos = f.tell()
2935 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002936 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002937
2938 with self.open(filename, 'a', encoding=charset) as f:
2939 f.write('xxx')
2940 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002941 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002942
2943 def test_seek_bom(self):
2944 # Same test, but when seeking manually
2945 filename = support.TESTFN
2946 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2947 with self.open(filename, 'w', encoding=charset) as f:
2948 f.write('aaa')
2949 pos = f.tell()
2950 with self.open(filename, 'r+', encoding=charset) as f:
2951 f.seek(pos)
2952 f.write('zzz')
2953 f.seek(0)
2954 f.write('bbb')
2955 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002956 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002957
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02002958 def test_seek_append_bom(self):
2959 # Same test, but first seek to the start and then to the end
2960 filename = support.TESTFN
2961 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2962 with self.open(filename, 'w', encoding=charset) as f:
2963 f.write('aaa')
2964 with self.open(filename, 'a', encoding=charset) as f:
2965 f.seek(0)
2966 f.seek(0, self.SEEK_END)
2967 f.write('xxx')
2968 with self.open(filename, 'rb') as f:
2969 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
2970
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002971 def test_errors_property(self):
2972 with self.open(support.TESTFN, "w") as f:
2973 self.assertEqual(f.errors, "strict")
2974 with self.open(support.TESTFN, "w", errors="replace") as f:
2975 self.assertEqual(f.errors, "replace")
2976
Brett Cannon31f59292011-02-21 19:29:56 +00002977 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002978 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002979 def test_threads_write(self):
2980 # Issue6750: concurrent writes could duplicate data
2981 event = threading.Event()
2982 with self.open(support.TESTFN, "w", buffering=1) as f:
2983 def run(n):
2984 text = "Thread%03d\n" % n
2985 event.wait()
2986 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03002987 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002988 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03002989 with support.start_threads(threads, event.set):
2990 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002991 with self.open(support.TESTFN) as f:
2992 content = f.read()
2993 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002994 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002995
Antoine Pitrou6be88762010-05-03 16:48:20 +00002996 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002997 # Test that text file is closed despite failed flush
2998 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00002999 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003000 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003001 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003002 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003003 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003004 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003005 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003006 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003007 self.assertTrue(txt.buffer.closed)
3008 self.assertTrue(closed) # flush() called
3009 self.assertFalse(closed[0]) # flush() called before file closed
3010 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003011 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003012
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003013 def test_close_error_on_close(self):
3014 buffer = self.BytesIO(self.testdata)
3015 def bad_flush():
3016 raise OSError('flush')
3017 def bad_close():
3018 raise OSError('close')
3019 buffer.close = bad_close
3020 txt = self.TextIOWrapper(buffer, encoding="ascii")
3021 txt.flush = bad_flush
3022 with self.assertRaises(OSError) as err: # exception not swallowed
3023 txt.close()
3024 self.assertEqual(err.exception.args, ('close',))
3025 self.assertIsInstance(err.exception.__context__, OSError)
3026 self.assertEqual(err.exception.__context__.args, ('flush',))
3027 self.assertFalse(txt.closed)
3028
3029 def test_nonnormalized_close_error_on_close(self):
3030 # Issue #21677
3031 buffer = self.BytesIO(self.testdata)
3032 def bad_flush():
3033 raise non_existing_flush
3034 def bad_close():
3035 raise non_existing_close
3036 buffer.close = bad_close
3037 txt = self.TextIOWrapper(buffer, encoding="ascii")
3038 txt.flush = bad_flush
3039 with self.assertRaises(NameError) as err: # exception not swallowed
3040 txt.close()
3041 self.assertIn('non_existing_close', str(err.exception))
3042 self.assertIsInstance(err.exception.__context__, NameError)
3043 self.assertIn('non_existing_flush', str(err.exception.__context__))
3044 self.assertFalse(txt.closed)
3045
Antoine Pitrou6be88762010-05-03 16:48:20 +00003046 def test_multi_close(self):
3047 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3048 txt.close()
3049 txt.close()
3050 txt.close()
3051 self.assertRaises(ValueError, txt.flush)
3052
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003053 def test_unseekable(self):
3054 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3055 self.assertRaises(self.UnsupportedOperation, txt.tell)
3056 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3057
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003058 def test_readonly_attributes(self):
3059 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3060 buf = self.BytesIO(self.testdata)
3061 with self.assertRaises(AttributeError):
3062 txt.buffer = buf
3063
Antoine Pitroue96ec682011-07-23 21:46:35 +02003064 def test_rawio(self):
3065 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3066 # that subprocess.Popen() can have the required unbuffered
3067 # semantics with universal_newlines=True.
3068 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3069 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3070 # Reads
3071 self.assertEqual(txt.read(4), 'abcd')
3072 self.assertEqual(txt.readline(), 'efghi\n')
3073 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3074
3075 def test_rawio_write_through(self):
3076 # Issue #12591: with write_through=True, writes don't need a flush
3077 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3078 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3079 write_through=True)
3080 txt.write('1')
3081 txt.write('23\n4')
3082 txt.write('5')
3083 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3084
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003085 def test_bufio_write_through(self):
3086 # Issue #21396: write_through=True doesn't force a flush()
3087 # on the underlying binary buffered object.
3088 flush_called, write_called = [], []
3089 class BufferedWriter(self.BufferedWriter):
3090 def flush(self, *args, **kwargs):
3091 flush_called.append(True)
3092 return super().flush(*args, **kwargs)
3093 def write(self, *args, **kwargs):
3094 write_called.append(True)
3095 return super().write(*args, **kwargs)
3096
3097 rawio = self.BytesIO()
3098 data = b"a"
3099 bufio = BufferedWriter(rawio, len(data)*2)
3100 textio = self.TextIOWrapper(bufio, encoding='ascii',
3101 write_through=True)
3102 # write to the buffered io but don't overflow the buffer
3103 text = data.decode('ascii')
3104 textio.write(text)
3105
3106 # buffer.flush is not called with write_through=True
3107 self.assertFalse(flush_called)
3108 # buffer.write *is* called with write_through=True
3109 self.assertTrue(write_called)
3110 self.assertEqual(rawio.getvalue(), b"") # no flush
3111
3112 write_called = [] # reset
3113 textio.write(text * 10) # total content is larger than bufio buffer
3114 self.assertTrue(write_called)
3115 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3116
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003117 def test_read_nonbytes(self):
3118 # Issue #17106
3119 # Crash when underlying read() returns non-bytes
3120 t = self.TextIOWrapper(self.StringIO('a'))
3121 self.assertRaises(TypeError, t.read, 1)
3122 t = self.TextIOWrapper(self.StringIO('a'))
3123 self.assertRaises(TypeError, t.readline)
3124 t = self.TextIOWrapper(self.StringIO('a'))
3125 self.assertRaises(TypeError, t.read)
3126
3127 def test_illegal_decoder(self):
3128 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003129 # Bypass the early encoding check added in issue 20404
3130 def _make_illegal_wrapper():
3131 quopri = codecs.lookup("quopri")
3132 quopri._is_text_encoding = True
3133 try:
3134 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3135 newline='\n', encoding="quopri")
3136 finally:
3137 quopri._is_text_encoding = False
3138 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003139 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003140 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003141 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003142 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003143 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003144 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003145 self.assertRaises(TypeError, t.read)
3146
Antoine Pitrou712cb732013-12-21 15:51:54 +01003147 def _check_create_at_shutdown(self, **kwargs):
3148 # Issue #20037: creating a TextIOWrapper at shutdown
3149 # shouldn't crash the interpreter.
3150 iomod = self.io.__name__
3151 code = """if 1:
3152 import codecs
3153 import {iomod} as io
3154
3155 # Avoid looking up codecs at shutdown
3156 codecs.lookup('utf-8')
3157
3158 class C:
3159 def __init__(self):
3160 self.buf = io.BytesIO()
3161 def __del__(self):
3162 io.TextIOWrapper(self.buf, **{kwargs})
3163 print("ok")
3164 c = C()
3165 """.format(iomod=iomod, kwargs=kwargs)
3166 return assert_python_ok("-c", code)
3167
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003168 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003169 def test_create_at_shutdown_without_encoding(self):
3170 rc, out, err = self._check_create_at_shutdown()
3171 if err:
3172 # Can error out with a RuntimeError if the module state
3173 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003174 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003175 else:
3176 self.assertEqual("ok", out.decode().strip())
3177
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003178 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003179 def test_create_at_shutdown_with_encoding(self):
3180 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3181 errors='strict')
3182 self.assertFalse(err)
3183 self.assertEqual("ok", out.decode().strip())
3184
Antoine Pitroub8503892014-04-29 10:14:02 +02003185 def test_read_byteslike(self):
3186 r = MemviewBytesIO(b'Just some random string\n')
3187 t = self.TextIOWrapper(r, 'utf-8')
3188
3189 # TextIOwrapper will not read the full string, because
3190 # we truncate it to a multiple of the native int size
3191 # so that we can construct a more complex memoryview.
3192 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3193
3194 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3195
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003196 def test_issue22849(self):
3197 class F(object):
3198 def readable(self): return True
3199 def writable(self): return True
3200 def seekable(self): return True
3201
3202 for i in range(10):
3203 try:
3204 self.TextIOWrapper(F(), encoding='utf-8')
3205 except Exception:
3206 pass
3207
3208 F.tell = lambda x: 0
3209 t = self.TextIOWrapper(F(), encoding='utf-8')
3210
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003211
Antoine Pitroub8503892014-04-29 10:14:02 +02003212class MemviewBytesIO(io.BytesIO):
3213 '''A BytesIO object whose read method returns memoryviews
3214 rather than bytes'''
3215
3216 def read1(self, len_):
3217 return _to_memoryview(super().read1(len_))
3218
3219 def read(self, len_):
3220 return _to_memoryview(super().read(len_))
3221
3222def _to_memoryview(buf):
3223 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3224
3225 arr = array.array('i')
3226 idx = len(buf) - len(buf) % arr.itemsize
3227 arr.frombytes(buf[:idx])
3228 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003229
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003230
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003231class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003232 io = io
Steve Dowerfe8f4c92016-09-07 09:31:52 -07003233 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003234
3235 def test_initialization(self):
3236 r = self.BytesIO(b"\xc3\xa9\n\n")
3237 b = self.BufferedReader(r, 1000)
3238 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003239 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3240 self.assertRaises(ValueError, t.read)
3241
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003242 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3243 self.assertRaises(Exception, repr, t)
3244
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003245 def test_garbage_collection(self):
3246 # C TextIOWrapper objects are collected, and collecting them flushes
3247 # all data to disk.
3248 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003249 with support.check_warnings(('', ResourceWarning)):
3250 rawio = io.FileIO(support.TESTFN, "wb")
3251 b = self.BufferedWriter(rawio)
3252 t = self.TextIOWrapper(b, encoding="ascii")
3253 t.write("456def")
3254 t.x = t
3255 wr = weakref.ref(t)
3256 del t
3257 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003258 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003259 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003260 self.assertEqual(f.read(), b"456def")
3261
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003262 def test_rwpair_cleared_before_textio(self):
3263 # Issue 13070: TextIOWrapper's finalization would crash when called
3264 # after the reference to the underlying BufferedRWPair's writer got
3265 # cleared by the GC.
3266 for i in range(1000):
3267 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3268 t1 = self.TextIOWrapper(b1, encoding="ascii")
3269 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3270 t2 = self.TextIOWrapper(b2, encoding="ascii")
3271 # circular references
3272 t1.buddy = t2
3273 t2.buddy = t1
3274 support.gc_collect()
3275
3276
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003277class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003278 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003279 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003280
3281
3282class IncrementalNewlineDecoderTest(unittest.TestCase):
3283
3284 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003285 # UTF-8 specific tests for a newline decoder
3286 def _check_decode(b, s, **kwargs):
3287 # We exercise getstate() / setstate() as well as decode()
3288 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003289 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003290 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003291 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003292
Antoine Pitrou180a3362008-12-14 16:36:46 +00003293 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003294
Antoine Pitrou180a3362008-12-14 16:36:46 +00003295 _check_decode(b'\xe8', "")
3296 _check_decode(b'\xa2', "")
3297 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003298
Antoine Pitrou180a3362008-12-14 16:36:46 +00003299 _check_decode(b'\xe8', "")
3300 _check_decode(b'\xa2', "")
3301 _check_decode(b'\x88', "\u8888")
3302
3303 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003304 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3305
Antoine Pitrou180a3362008-12-14 16:36:46 +00003306 decoder.reset()
3307 _check_decode(b'\n', "\n")
3308 _check_decode(b'\r', "")
3309 _check_decode(b'', "\n", final=True)
3310 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003311
Antoine Pitrou180a3362008-12-14 16:36:46 +00003312 _check_decode(b'\r', "")
3313 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003314
Antoine Pitrou180a3362008-12-14 16:36:46 +00003315 _check_decode(b'\r\r\n', "\n\n")
3316 _check_decode(b'\r', "")
3317 _check_decode(b'\r', "\n")
3318 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003319
Antoine Pitrou180a3362008-12-14 16:36:46 +00003320 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3321 _check_decode(b'\xe8\xa2\x88', "\u8888")
3322 _check_decode(b'\n', "\n")
3323 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3324 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003325
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003326 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003327 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003328 if encoding is not None:
3329 encoder = codecs.getincrementalencoder(encoding)()
3330 def _decode_bytewise(s):
3331 # Decode one byte at a time
3332 for b in encoder.encode(s):
3333 result.append(decoder.decode(bytes([b])))
3334 else:
3335 encoder = None
3336 def _decode_bytewise(s):
3337 # Decode one char at a time
3338 for c in s:
3339 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003340 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003341 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003342 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003343 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003344 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003345 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003346 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003347 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003348 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003349 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003350 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003351 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003352 input = "abc"
3353 if encoder is not None:
3354 encoder.reset()
3355 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003356 self.assertEqual(decoder.decode(input), "abc")
3357 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003358
3359 def test_newline_decoder(self):
3360 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003361 # None meaning the IncrementalNewlineDecoder takes unicode input
3362 # rather than bytes input
3363 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003364 'utf-16', 'utf-16-le', 'utf-16-be',
3365 'utf-32', 'utf-32-le', 'utf-32-be',
3366 )
3367 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003368 decoder = enc and codecs.getincrementaldecoder(enc)()
3369 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3370 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003371 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003372 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3373 self.check_newline_decoding_utf8(decoder)
3374
Antoine Pitrou66913e22009-03-06 23:40:56 +00003375 def test_newline_bytes(self):
3376 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3377 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003378 self.assertEqual(dec.newlines, None)
3379 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3380 self.assertEqual(dec.newlines, None)
3381 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3382 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003383 dec = self.IncrementalNewlineDecoder(None, translate=False)
3384 _check(dec)
3385 dec = self.IncrementalNewlineDecoder(None, translate=True)
3386 _check(dec)
3387
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003388class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3389 pass
3390
3391class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3392 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003393
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003394
Guido van Rossum01a27522007-03-07 01:00:12 +00003395# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003396
Guido van Rossum5abbf752007-08-27 17:39:33 +00003397class MiscIOTest(unittest.TestCase):
3398
Barry Warsaw40e82462008-11-20 20:14:50 +00003399 def tearDown(self):
3400 support.unlink(support.TESTFN)
3401
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003402 def test___all__(self):
3403 for name in self.io.__all__:
3404 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003405 self.assertIsNotNone(obj, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003406 if name == "open":
3407 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003408 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003409 self.assertTrue(issubclass(obj, Exception), name)
3410 elif not name.startswith("SEEK_"):
3411 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003412
Barry Warsaw40e82462008-11-20 20:14:50 +00003413 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003414 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003415 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003416 f.close()
3417
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003418 with support.check_warnings(('', DeprecationWarning)):
3419 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003420 self.assertEqual(f.name, support.TESTFN)
3421 self.assertEqual(f.buffer.name, support.TESTFN)
3422 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3423 self.assertEqual(f.mode, "U")
3424 self.assertEqual(f.buffer.mode, "rb")
3425 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003426 f.close()
3427
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003428 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003429 self.assertEqual(f.mode, "w+")
3430 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3431 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003432
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003433 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003434 self.assertEqual(g.mode, "wb")
3435 self.assertEqual(g.raw.mode, "wb")
3436 self.assertEqual(g.name, f.fileno())
3437 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003438 f.close()
3439 g.close()
3440
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003441 def test_io_after_close(self):
3442 for kwargs in [
3443 {"mode": "w"},
3444 {"mode": "wb"},
3445 {"mode": "w", "buffering": 1},
3446 {"mode": "w", "buffering": 2},
3447 {"mode": "wb", "buffering": 0},
3448 {"mode": "r"},
3449 {"mode": "rb"},
3450 {"mode": "r", "buffering": 1},
3451 {"mode": "r", "buffering": 2},
3452 {"mode": "rb", "buffering": 0},
3453 {"mode": "w+"},
3454 {"mode": "w+b"},
3455 {"mode": "w+", "buffering": 1},
3456 {"mode": "w+", "buffering": 2},
3457 {"mode": "w+b", "buffering": 0},
3458 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003459 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003460 f.close()
3461 self.assertRaises(ValueError, f.flush)
3462 self.assertRaises(ValueError, f.fileno)
3463 self.assertRaises(ValueError, f.isatty)
3464 self.assertRaises(ValueError, f.__iter__)
3465 if hasattr(f, "peek"):
3466 self.assertRaises(ValueError, f.peek, 1)
3467 self.assertRaises(ValueError, f.read)
3468 if hasattr(f, "read1"):
3469 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003470 if hasattr(f, "readall"):
3471 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003472 if hasattr(f, "readinto"):
3473 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003474 if hasattr(f, "readinto1"):
3475 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003476 self.assertRaises(ValueError, f.readline)
3477 self.assertRaises(ValueError, f.readlines)
3478 self.assertRaises(ValueError, f.seek, 0)
3479 self.assertRaises(ValueError, f.tell)
3480 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003481 self.assertRaises(ValueError, f.write,
3482 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003483 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003484 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003485
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003486 def test_blockingioerror(self):
3487 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003488 class C(str):
3489 pass
3490 c = C("")
3491 b = self.BlockingIOError(1, c)
3492 c.b = b
3493 b.c = c
3494 wr = weakref.ref(c)
3495 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003496 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003497 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003498
3499 def test_abcs(self):
3500 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003501 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3502 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3503 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3504 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003505
3506 def _check_abc_inheritance(self, abcmodule):
3507 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003508 self.assertIsInstance(f, abcmodule.IOBase)
3509 self.assertIsInstance(f, abcmodule.RawIOBase)
3510 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3511 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003512 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003513 self.assertIsInstance(f, abcmodule.IOBase)
3514 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3515 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3516 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003517 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003518 self.assertIsInstance(f, abcmodule.IOBase)
3519 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3520 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3521 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003522
3523 def test_abc_inheritance(self):
3524 # Test implementations inherit from their respective ABCs
3525 self._check_abc_inheritance(self)
3526
3527 def test_abc_inheritance_official(self):
3528 # Test implementations inherit from the official ABCs of the
3529 # baseline "io" module.
3530 self._check_abc_inheritance(io)
3531
Antoine Pitroue033e062010-10-29 10:38:18 +00003532 def _check_warn_on_dealloc(self, *args, **kwargs):
3533 f = open(*args, **kwargs)
3534 r = repr(f)
3535 with self.assertWarns(ResourceWarning) as cm:
3536 f = None
3537 support.gc_collect()
3538 self.assertIn(r, str(cm.warning.args[0]))
3539
3540 def test_warn_on_dealloc(self):
3541 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3542 self._check_warn_on_dealloc(support.TESTFN, "wb")
3543 self._check_warn_on_dealloc(support.TESTFN, "w")
3544
3545 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3546 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003547 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003548 for fd in fds:
3549 try:
3550 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003551 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003552 if e.errno != errno.EBADF:
3553 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003554 self.addCleanup(cleanup_fds)
3555 r, w = os.pipe()
3556 fds += r, w
3557 self._check_warn_on_dealloc(r, *args, **kwargs)
3558 # When using closefd=False, there's no warning
3559 r, w = os.pipe()
3560 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02003561 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00003562 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00003563
3564 def test_warn_on_dealloc_fd(self):
3565 self._check_warn_on_dealloc_fd("rb", buffering=0)
3566 self._check_warn_on_dealloc_fd("rb")
3567 self._check_warn_on_dealloc_fd("r")
3568
3569
Antoine Pitrou243757e2010-11-05 21:15:39 +00003570 def test_pickling(self):
3571 # Pickling file objects is forbidden
3572 for kwargs in [
3573 {"mode": "w"},
3574 {"mode": "wb"},
3575 {"mode": "wb", "buffering": 0},
3576 {"mode": "r"},
3577 {"mode": "rb"},
3578 {"mode": "rb", "buffering": 0},
3579 {"mode": "w+"},
3580 {"mode": "w+b"},
3581 {"mode": "w+b", "buffering": 0},
3582 ]:
3583 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3584 with self.open(support.TESTFN, **kwargs) as f:
3585 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3586
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003587 def test_nonblock_pipe_write_bigbuf(self):
3588 self._test_nonblock_pipe_write(16*1024)
3589
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003590 def test_nonblock_pipe_write_smallbuf(self):
3591 self._test_nonblock_pipe_write(1024)
3592
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003593 @unittest.skipUnless(hasattr(os, 'set_blocking'),
3594 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003595 def _test_nonblock_pipe_write(self, bufsize):
3596 sent = []
3597 received = []
3598 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003599 os.set_blocking(r, False)
3600 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003601
3602 # To exercise all code paths in the C implementation we need
3603 # to play with buffer sizes. For instance, if we choose a
3604 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3605 # then we will never get a partial write of the buffer.
3606 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3607 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3608
3609 with rf, wf:
3610 for N in 9999, 73, 7574:
3611 try:
3612 i = 0
3613 while True:
3614 msg = bytes([i % 26 + 97]) * N
3615 sent.append(msg)
3616 wf.write(msg)
3617 i += 1
3618
3619 except self.BlockingIOError as e:
3620 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003621 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003622 sent[-1] = sent[-1][:e.characters_written]
3623 received.append(rf.read())
3624 msg = b'BLOCKED'
3625 wf.write(msg)
3626 sent.append(msg)
3627
3628 while True:
3629 try:
3630 wf.flush()
3631 break
3632 except self.BlockingIOError as e:
3633 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003634 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003635 self.assertEqual(e.characters_written, 0)
3636 received.append(rf.read())
3637
3638 received += iter(rf.read, None)
3639
3640 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003641 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003642 self.assertTrue(wf.closed)
3643 self.assertTrue(rf.closed)
3644
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003645 def test_create_fail(self):
3646 # 'x' mode fails if file is existing
3647 with self.open(support.TESTFN, 'w'):
3648 pass
3649 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3650
3651 def test_create_writes(self):
3652 # 'x' mode opens for writing
3653 with self.open(support.TESTFN, 'xb') as f:
3654 f.write(b"spam")
3655 with self.open(support.TESTFN, 'rb') as f:
3656 self.assertEqual(b"spam", f.read())
3657
Christian Heimes7b648752012-09-10 14:48:43 +02003658 def test_open_allargs(self):
3659 # there used to be a buffer overflow in the parser for rawmode
3660 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3661
3662
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003663class CMiscIOTest(MiscIOTest):
3664 io = io
3665
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003666 def test_readinto_buffer_overflow(self):
3667 # Issue #18025
3668 class BadReader(self.io.BufferedIOBase):
3669 def read(self, n=-1):
3670 return b'x' * 10**6
3671 bufio = BadReader()
3672 b = bytearray(2)
3673 self.assertRaises(ValueError, bufio.readinto, b)
3674
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003675 @unittest.skipUnless(threading, 'Threading required for this test.')
3676 def check_daemon_threads_shutdown_deadlock(self, stream_name):
3677 # Issue #23309: deadlocks at shutdown should be avoided when a
3678 # daemon thread and the main thread both write to a file.
3679 code = """if 1:
3680 import sys
3681 import time
3682 import threading
3683
3684 file = sys.{stream_name}
3685
3686 def run():
3687 while True:
3688 file.write('.')
3689 file.flush()
3690
3691 thread = threading.Thread(target=run)
3692 thread.daemon = True
3693 thread.start()
3694
3695 time.sleep(0.5)
3696 file.write('!')
3697 file.flush()
3698 """.format_map(locals())
3699 res, _ = run_python_until_end("-c", code)
3700 err = res.err.decode()
3701 if res.rc != 0:
3702 # Failure: should be a fatal error
3703 self.assertIn("Fatal Python error: could not acquire lock "
3704 "for <_io.BufferedWriter name='<{stream_name}>'> "
3705 "at interpreter shutdown, possibly due to "
3706 "daemon threads".format_map(locals()),
3707 err)
3708 else:
3709 self.assertFalse(err.strip('.!'))
3710
3711 def test_daemon_threads_shutdown_stdout_deadlock(self):
3712 self.check_daemon_threads_shutdown_deadlock('stdout')
3713
3714 def test_daemon_threads_shutdown_stderr_deadlock(self):
3715 self.check_daemon_threads_shutdown_deadlock('stderr')
3716
3717
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003718class PyMiscIOTest(MiscIOTest):
3719 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003720
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003721
3722@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3723class SignalsTest(unittest.TestCase):
3724
3725 def setUp(self):
3726 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3727
3728 def tearDown(self):
3729 signal.signal(signal.SIGALRM, self.oldalrm)
3730
3731 def alarm_interrupt(self, sig, frame):
3732 1/0
3733
3734 @unittest.skipUnless(threading, 'Threading required for this test.')
3735 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3736 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003737 invokes the signal handler, and bubbles up the exception raised
3738 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003739 read_results = []
3740 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003741 if hasattr(signal, 'pthread_sigmask'):
3742 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003743 s = os.read(r, 1)
3744 read_results.append(s)
3745 t = threading.Thread(target=_read)
3746 t.daemon = True
3747 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003748 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01003749 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003750 try:
3751 wio = self.io.open(w, **fdopen_kwargs)
3752 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003753 # Fill the pipe enough that the write will be blocking.
3754 # It will be interrupted by the timer armed above. Since the
3755 # other thread has read one byte, the low-level write will
3756 # return with a successful (partial) result rather than an EINTR.
3757 # The buffered IO layer must check for pending signal
3758 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003759 signal.alarm(1)
3760 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01003761 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02003762 finally:
3763 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003764 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003765 # We got one byte, get another one and check that it isn't a
3766 # repeat of the first one.
3767 read_results.append(os.read(r, 1))
3768 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3769 finally:
3770 os.close(w)
3771 os.close(r)
3772 # This is deliberate. If we didn't close the file descriptor
3773 # before closing wio, wio would try to flush its internal
3774 # buffer, and block again.
3775 try:
3776 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003777 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003778 if e.errno != errno.EBADF:
3779 raise
3780
3781 def test_interrupted_write_unbuffered(self):
3782 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3783
3784 def test_interrupted_write_buffered(self):
3785 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3786
Victor Stinner6ab72862014-09-03 23:32:28 +02003787 # Issue #22331: The test hangs on FreeBSD 7.2
3788 @support.requires_freebsd_version(8)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003789 def test_interrupted_write_text(self):
3790 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3791
Brett Cannon31f59292011-02-21 19:29:56 +00003792 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003793 def check_reentrant_write(self, data, **fdopen_kwargs):
3794 def on_alarm(*args):
3795 # Will be called reentrantly from the same thread
3796 wio.write(data)
3797 1/0
3798 signal.signal(signal.SIGALRM, on_alarm)
3799 r, w = os.pipe()
3800 wio = self.io.open(w, **fdopen_kwargs)
3801 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003802 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003803 # Either the reentrant call to wio.write() fails with RuntimeError,
3804 # or the signal handler raises ZeroDivisionError.
3805 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3806 while 1:
3807 for i in range(100):
3808 wio.write(data)
3809 wio.flush()
3810 # Make sure the buffer doesn't fill up and block further writes
3811 os.read(r, len(data) * 100)
3812 exc = cm.exception
3813 if isinstance(exc, RuntimeError):
3814 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3815 finally:
3816 wio.close()
3817 os.close(r)
3818
3819 def test_reentrant_write_buffered(self):
3820 self.check_reentrant_write(b"xy", mode="wb")
3821
3822 def test_reentrant_write_text(self):
3823 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3824
Antoine Pitrou707ce822011-02-25 21:24:11 +00003825 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3826 """Check that a buffered read, when it gets interrupted (either
3827 returning a partial result or EINTR), properly invokes the signal
3828 handler and retries if the latter returned successfully."""
3829 r, w = os.pipe()
3830 fdopen_kwargs["closefd"] = False
3831 def alarm_handler(sig, frame):
3832 os.write(w, b"bar")
3833 signal.signal(signal.SIGALRM, alarm_handler)
3834 try:
3835 rio = self.io.open(r, **fdopen_kwargs)
3836 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003837 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003838 # Expected behaviour:
3839 # - first raw read() returns partial b"foo"
3840 # - second raw read() returns EINTR
3841 # - third raw read() returns b"bar"
3842 self.assertEqual(decode(rio.read(6)), "foobar")
3843 finally:
3844 rio.close()
3845 os.close(w)
3846 os.close(r)
3847
Antoine Pitrou20db5112011-08-19 20:32:34 +02003848 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003849 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3850 mode="rb")
3851
Antoine Pitrou20db5112011-08-19 20:32:34 +02003852 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003853 self.check_interrupted_read_retry(lambda x: x,
3854 mode="r")
3855
3856 @unittest.skipUnless(threading, 'Threading required for this test.')
3857 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3858 """Check that a buffered write, when it gets interrupted (either
3859 returning a partial result or EINTR), properly invokes the signal
3860 handler and retries if the latter returned successfully."""
3861 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003862
Antoine Pitrou707ce822011-02-25 21:24:11 +00003863 # A quantity that exceeds the buffer size of an anonymous pipe's
3864 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003865 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003866 r, w = os.pipe()
3867 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003868
Antoine Pitrou707ce822011-02-25 21:24:11 +00003869 # We need a separate thread to read from the pipe and allow the
3870 # write() to finish. This thread is started after the SIGALRM is
3871 # received (forcing a first EINTR in write()).
3872 read_results = []
3873 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003874 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00003875 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003876 try:
3877 while not write_finished:
3878 while r in select.select([r], [], [], 1.0)[0]:
3879 s = os.read(r, 1024)
3880 read_results.append(s)
3881 except BaseException as exc:
3882 nonlocal error
3883 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00003884 t = threading.Thread(target=_read)
3885 t.daemon = True
3886 def alarm1(sig, frame):
3887 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003888 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003889 def alarm2(sig, frame):
3890 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003891
3892 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00003893 signal.signal(signal.SIGALRM, alarm1)
3894 try:
3895 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003896 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003897 # Expected behaviour:
3898 # - first raw write() is partial (because of the limited pipe buffer
3899 # and the first alarm)
3900 # - second raw write() returns EINTR (because of the second alarm)
3901 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003902 written = wio.write(large_data)
3903 self.assertEqual(N, written)
3904
Antoine Pitrou707ce822011-02-25 21:24:11 +00003905 wio.flush()
3906 write_finished = True
3907 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003908
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003909 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003910 self.assertEqual(N, sum(len(x) for x in read_results))
3911 finally:
3912 write_finished = True
3913 os.close(w)
3914 os.close(r)
3915 # This is deliberate. If we didn't close the file descriptor
3916 # before closing wio, wio would try to flush its internal
3917 # buffer, and could block (in case of failure).
3918 try:
3919 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003920 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003921 if e.errno != errno.EBADF:
3922 raise
3923
Antoine Pitrou20db5112011-08-19 20:32:34 +02003924 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003925 self.check_interrupted_write_retry(b"x", mode="wb")
3926
Antoine Pitrou20db5112011-08-19 20:32:34 +02003927 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003928 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3929
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003930
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003931class CSignalsTest(SignalsTest):
3932 io = io
3933
3934class PySignalsTest(SignalsTest):
3935 io = pyio
3936
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003937 # Handling reentrancy issues would slow down _pyio even more, so the
3938 # tests are disabled.
3939 test_reentrant_write_buffered = None
3940 test_reentrant_write_text = None
3941
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003942
Ezio Melottidaa42c72013-03-23 16:30:16 +02003943def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07003944 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003945 CBufferedReaderTest, PyBufferedReaderTest,
3946 CBufferedWriterTest, PyBufferedWriterTest,
3947 CBufferedRWPairTest, PyBufferedRWPairTest,
3948 CBufferedRandomTest, PyBufferedRandomTest,
3949 StatefulIncrementalDecoderTest,
3950 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3951 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003952 CMiscIOTest, PyMiscIOTest,
3953 CSignalsTest, PySignalsTest,
3954 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003955
3956 # Put the namespaces of the IO module we are testing and some useful mock
3957 # classes in the __dict__ of each test.
3958 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003959 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003960 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3961 c_io_ns = {name : getattr(io, name) for name in all_members}
3962 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3963 globs = globals()
3964 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3965 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3966 # Avoid turning open into a bound method.
3967 py_io_ns["open"] = pyio.OpenWrapper
3968 for test in tests:
3969 if test.__name__.startswith("C"):
3970 for name, obj in c_io_ns.items():
3971 setattr(test, name, obj)
3972 elif test.__name__.startswith("Py"):
3973 for name, obj in py_io_ns.items():
3974 setattr(test, name, obj)
3975
Ezio Melottidaa42c72013-03-23 16:30:16 +02003976 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3977 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003978
3979if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003980 unittest.main()