blob: aaa64eadffe1ea339fea217bc65eed63f8bf8d8b [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Berker Peksagce643912015-05-06 06:33:17 +030038from test.support.script_helper import assert_python_ok, run_python_until_end
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000047
Martin Panter6bb91f32016-05-28 00:41:57 +000048try:
49 import ctypes
50except ImportError:
51 def byteslike(*pos, **kw):
52 return array.array("b", bytes(*pos, **kw))
53else:
54 def byteslike(*pos, **kw):
55 """Create a bytes-like object having no string or sequence methods"""
56 data = bytes(*pos, **kw)
57 obj = EmptyStruct()
58 ctypes.resize(obj, len(data))
59 memoryview(obj).cast("B")[:] = data
60 return obj
61 class EmptyStruct(ctypes.Structure):
62 pass
63
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064def _default_chunk_size():
65 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000066 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000067 return f._CHUNK_SIZE
68
69
Antoine Pitrou328ec742010-09-14 18:37:24 +000070class MockRawIOWithoutRead:
71 """A RawIO implementation without read(), so as to exercise the default
72 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000073
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000074 def __init__(self, read_stack=()):
75 self._read_stack = list(read_stack)
76 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000077 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000078 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000079
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000082 return len(b)
83
84 def writable(self):
85 return True
86
Guido van Rossum68bbcd22007-02-27 17:19:33 +000087 def fileno(self):
88 return 42
89
90 def readable(self):
91 return True
92
Guido van Rossum01a27522007-03-07 01:00:12 +000093 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000094 return True
95
Guido van Rossum01a27522007-03-07 01:00:12 +000096 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000098
99 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000100 return 0 # same comment as above
101
102 def readinto(self, buf):
103 self._reads += 1
104 max_len = len(buf)
105 try:
106 data = self._read_stack[0]
107 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000108 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000109 return 0
110 if data is None:
111 del self._read_stack[0]
112 return None
113 n = len(data)
114 if len(data) <= max_len:
115 del self._read_stack[0]
116 buf[:n] = data
117 return n
118 else:
119 buf[:] = data[:max_len]
120 self._read_stack[0] = data[max_len:]
121 return max_len
122
123 def truncate(self, pos=None):
124 return pos
125
Antoine Pitrou328ec742010-09-14 18:37:24 +0000126class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
127 pass
128
129class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
130 pass
131
132
133class MockRawIO(MockRawIOWithoutRead):
134
135 def read(self, n=None):
136 self._reads += 1
137 try:
138 return self._read_stack.pop(0)
139 except:
140 self._extraneous_reads += 1
141 return b""
142
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000143class CMockRawIO(MockRawIO, io.RawIOBase):
144 pass
145
146class PyMockRawIO(MockRawIO, pyio.RawIOBase):
147 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000148
Guido van Rossuma9e20242007-03-08 00:43:48 +0000149
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000150class MisbehavedRawIO(MockRawIO):
151 def write(self, b):
152 return super().write(b) * 2
153
154 def read(self, n=None):
155 return super().read(n) * 2
156
157 def seek(self, pos, whence):
158 return -123
159
160 def tell(self):
161 return -456
162
163 def readinto(self, buf):
164 super().readinto(buf)
165 return len(buf) * 5
166
167class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
168 pass
169
170class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
171 pass
172
173
174class CloseFailureIO(MockRawIO):
175 closed = 0
176
177 def close(self):
178 if not self.closed:
179 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200180 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181
182class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
183 pass
184
185class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
186 pass
187
188
189class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000190
191 def __init__(self, data):
192 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000193 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000194
195 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000197 self.read_history.append(None if res is None else len(res))
198 return res
199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000200 def readinto(self, b):
201 res = super().readinto(b)
202 self.read_history.append(res)
203 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000204
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000205class CMockFileIO(MockFileIO, io.BytesIO):
206 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000207
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000208class PyMockFileIO(MockFileIO, pyio.BytesIO):
209 pass
210
211
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000212class MockUnseekableIO:
213 def seekable(self):
214 return False
215
216 def seek(self, *args):
217 raise self.UnsupportedOperation("not seekable")
218
219 def tell(self, *args):
220 raise self.UnsupportedOperation("not seekable")
221
Martin Panter754aab22016-03-31 07:21:56 +0000222 def truncate(self, *args):
223 raise self.UnsupportedOperation("not seekable")
224
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000225class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
226 UnsupportedOperation = io.UnsupportedOperation
227
228class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
229 UnsupportedOperation = pyio.UnsupportedOperation
230
231
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000232class MockNonBlockWriterIO:
233
234 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000235 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000236 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000238 def pop_written(self):
239 s = b"".join(self._write_stack)
240 self._write_stack[:] = []
241 return s
242
243 def block_on(self, char):
244 """Block when a given char is encountered."""
245 self._blocker_char = char
246
247 def readable(self):
248 return True
249
250 def seekable(self):
251 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000252
Guido van Rossum01a27522007-03-07 01:00:12 +0000253 def writable(self):
254 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000255
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000256 def write(self, b):
257 b = bytes(b)
258 n = -1
259 if self._blocker_char:
260 try:
261 n = b.index(self._blocker_char)
262 except ValueError:
263 pass
264 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100265 if n > 0:
266 # write data up to the first blocker
267 self._write_stack.append(b[:n])
268 return n
269 else:
270 # cancel blocker and indicate would block
271 self._blocker_char = None
272 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000273 self._write_stack.append(b)
274 return len(b)
275
276class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
277 BlockingIOError = io.BlockingIOError
278
279class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
280 BlockingIOError = pyio.BlockingIOError
281
Guido van Rossuma9e20242007-03-08 00:43:48 +0000282
Guido van Rossum28524c72007-02-27 05:47:44 +0000283class IOTest(unittest.TestCase):
284
Neal Norwitze7789b12008-03-24 06:18:09 +0000285 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000286 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000287
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000288 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000289 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000290
Guido van Rossum28524c72007-02-27 05:47:44 +0000291 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000292 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293 f.truncate(0)
294 self.assertEqual(f.tell(), 5)
295 f.seek(0)
296
297 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000298 self.assertEqual(f.seek(0), 0)
299 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000300 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000301 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000302 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000303 buffer = bytearray(b" world\n\n\n")
304 self.assertEqual(f.write(buffer), 9)
305 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000306 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000307 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000308 self.assertEqual(f.seek(-1, 2), 13)
309 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000310
Guido van Rossum87429772007-04-10 21:06:59 +0000311 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000312 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000313 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000314
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 def read_ops(self, f, buffered=False):
316 data = f.read(5)
317 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000318 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000319 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000320 self.assertEqual(bytes(data), b" worl")
321 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000322 self.assertEqual(f.readinto(data), 2)
323 self.assertEqual(len(data), 5)
324 self.assertEqual(data[:2], b"d\n")
325 self.assertEqual(f.seek(0), 0)
326 self.assertEqual(f.read(20), b"hello world\n")
327 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000328 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000329 self.assertEqual(f.seek(-6, 2), 6)
330 self.assertEqual(f.read(5), b"world")
331 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000332 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000333 self.assertEqual(f.seek(-6, 1), 5)
334 self.assertEqual(f.read(5), b" worl")
335 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000336 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000337 if buffered:
338 f.seek(0)
339 self.assertEqual(f.read(), b"hello world\n")
340 f.seek(6)
341 self.assertEqual(f.read(), b"world\n")
342 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000343 f.seek(0)
344 data = byteslike(5)
345 self.assertEqual(f.readinto1(data), 5)
346 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000347
Guido van Rossum34d69e52007-04-10 20:08:41 +0000348 LARGE = 2**31
349
Guido van Rossum53807da2007-04-10 19:01:47 +0000350 def large_file_ops(self, f):
351 assert f.readable()
352 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100353 try:
354 self.assertEqual(f.seek(self.LARGE), self.LARGE)
355 except (OverflowError, ValueError):
356 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000357 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000358 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000359 self.assertEqual(f.tell(), self.LARGE + 3)
360 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000361 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000362 self.assertEqual(f.tell(), self.LARGE + 2)
363 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000364 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000365 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000366 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
367 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000368 self.assertEqual(f.read(2), b"x")
369
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000370 def test_invalid_operations(self):
371 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000372 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000373 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000374 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000375 self.assertRaises(exc, fp.read)
376 self.assertRaises(exc, fp.readline)
377 with self.open(support.TESTFN, "wb", buffering=0) as fp:
378 self.assertRaises(exc, fp.read)
379 self.assertRaises(exc, fp.readline)
380 with self.open(support.TESTFN, "rb", buffering=0) as fp:
381 self.assertRaises(exc, fp.write, b"blah")
382 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000383 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000384 self.assertRaises(exc, fp.write, b"blah")
385 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000386 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000387 self.assertRaises(exc, fp.write, "blah")
388 self.assertRaises(exc, fp.writelines, ["blah\n"])
389 # Non-zero seeking from current or end pos
390 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
391 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000392
Martin Panter754aab22016-03-31 07:21:56 +0000393 def test_optional_abilities(self):
394 # Test for OSError when optional APIs are not supported
395 # The purpose of this test is to try fileno(), reading, writing and
396 # seeking operations with various objects that indicate they do not
397 # support these operations.
398
399 def pipe_reader():
400 [r, w] = os.pipe()
401 os.close(w) # So that read() is harmless
402 return self.FileIO(r, "r")
403
404 def pipe_writer():
405 [r, w] = os.pipe()
406 self.addCleanup(os.close, r)
407 # Guarantee that we can write into the pipe without blocking
408 thread = threading.Thread(target=os.read, args=(r, 100))
409 thread.start()
410 self.addCleanup(thread.join)
411 return self.FileIO(w, "w")
412
413 def buffered_reader():
414 return self.BufferedReader(self.MockUnseekableIO())
415
416 def buffered_writer():
417 return self.BufferedWriter(self.MockUnseekableIO())
418
419 def buffered_random():
420 return self.BufferedRandom(self.BytesIO())
421
422 def buffered_rw_pair():
423 return self.BufferedRWPair(self.MockUnseekableIO(),
424 self.MockUnseekableIO())
425
426 def text_reader():
427 class UnseekableReader(self.MockUnseekableIO):
428 writable = self.BufferedIOBase.writable
429 write = self.BufferedIOBase.write
430 return self.TextIOWrapper(UnseekableReader(), "ascii")
431
432 def text_writer():
433 class UnseekableWriter(self.MockUnseekableIO):
434 readable = self.BufferedIOBase.readable
435 read = self.BufferedIOBase.read
436 return self.TextIOWrapper(UnseekableWriter(), "ascii")
437
438 tests = (
439 (pipe_reader, "fr"), (pipe_writer, "fw"),
440 (buffered_reader, "r"), (buffered_writer, "w"),
441 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
442 (text_reader, "r"), (text_writer, "w"),
443 (self.BytesIO, "rws"), (self.StringIO, "rws"),
444 )
445 for [test, abilities] in tests:
446 if test is pipe_writer and not threading:
447 continue # Skip subtest that uses a background thread
448 with self.subTest(test), test() as obj:
449 readable = "r" in abilities
450 self.assertEqual(obj.readable(), readable)
451 writable = "w" in abilities
452 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000453
454 if isinstance(obj, self.TextIOBase):
455 data = "3"
456 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
457 data = b"3"
458 else:
459 self.fail("Unknown base class")
460
461 if "f" in abilities:
462 obj.fileno()
463 else:
464 self.assertRaises(OSError, obj.fileno)
465
466 if readable:
467 obj.read(1)
468 obj.read()
469 else:
470 self.assertRaises(OSError, obj.read, 1)
471 self.assertRaises(OSError, obj.read)
472
473 if writable:
474 obj.write(data)
475 else:
476 self.assertRaises(OSError, obj.write, data)
477
Martin Panter3ee147f2016-03-31 21:05:31 +0000478 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000479 pipe_reader, pipe_writer):
480 # Pipes seem to appear as seekable on Windows
481 continue
482 seekable = "s" in abilities
483 self.assertEqual(obj.seekable(), seekable)
484
Martin Panter754aab22016-03-31 07:21:56 +0000485 if seekable:
486 obj.tell()
487 obj.seek(0)
488 else:
489 self.assertRaises(OSError, obj.tell)
490 self.assertRaises(OSError, obj.seek, 0)
491
492 if writable and seekable:
493 obj.truncate()
494 obj.truncate(0)
495 else:
496 self.assertRaises(OSError, obj.truncate)
497 self.assertRaises(OSError, obj.truncate, 0)
498
Antoine Pitrou13348842012-01-29 18:36:34 +0100499 def test_open_handles_NUL_chars(self):
500 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300501 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100502
503 bytes_fn = bytes(fn_with_NUL, 'ascii')
504 with warnings.catch_warnings():
505 warnings.simplefilter("ignore", DeprecationWarning)
506 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100507
Guido van Rossum28524c72007-02-27 05:47:44 +0000508 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000509 with self.open(support.TESTFN, "wb", buffering=0) as f:
510 self.assertEqual(f.readable(), False)
511 self.assertEqual(f.writable(), True)
512 self.assertEqual(f.seekable(), True)
513 self.write_ops(f)
514 with self.open(support.TESTFN, "rb", buffering=0) as f:
515 self.assertEqual(f.readable(), True)
516 self.assertEqual(f.writable(), False)
517 self.assertEqual(f.seekable(), True)
518 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000519
Guido van Rossum87429772007-04-10 21:06:59 +0000520 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000521 with self.open(support.TESTFN, "wb") as f:
522 self.assertEqual(f.readable(), False)
523 self.assertEqual(f.writable(), True)
524 self.assertEqual(f.seekable(), True)
525 self.write_ops(f)
526 with self.open(support.TESTFN, "rb") as f:
527 self.assertEqual(f.readable(), True)
528 self.assertEqual(f.writable(), False)
529 self.assertEqual(f.seekable(), True)
530 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000531
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000532 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000533 with self.open(support.TESTFN, "wb") as f:
534 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
535 with self.open(support.TESTFN, "rb") as f:
536 self.assertEqual(f.readline(), b"abc\n")
537 self.assertEqual(f.readline(10), b"def\n")
538 self.assertEqual(f.readline(2), b"xy")
539 self.assertEqual(f.readline(4), b"zzy\n")
540 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000541 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000542 self.assertRaises(TypeError, f.readline, 5.3)
543 with self.open(support.TESTFN, "r") as f:
544 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000545
Guido van Rossum28524c72007-02-27 05:47:44 +0000546 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000547 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000548 self.write_ops(f)
549 data = f.getvalue()
550 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000551 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000552 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000553
Guido van Rossum53807da2007-04-10 19:01:47 +0000554 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000555 # On Windows and Mac OSX this test comsumes large resources; It takes
556 # a long time to build the >2GB file and takes >2GB of disk space
557 # therefore the resource must be enabled to run this test.
558 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600559 support.requires(
560 'largefile',
561 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000562 with self.open(support.TESTFN, "w+b", 0) as f:
563 self.large_file_ops(f)
564 with self.open(support.TESTFN, "w+b") as f:
565 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000566
567 def test_with_open(self):
568 for bufsize in (0, 1, 100):
569 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000570 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000571 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000572 self.assertEqual(f.closed, True)
573 f = None
574 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000575 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000576 1/0
577 except ZeroDivisionError:
578 self.assertEqual(f.closed, True)
579 else:
580 self.fail("1/0 didn't raise an exception")
581
Antoine Pitrou08838b62009-01-21 00:55:13 +0000582 # issue 5008
583 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000584 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000585 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000586 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000587 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000588 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000589 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000590 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300591 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000592
Guido van Rossum87429772007-04-10 21:06:59 +0000593 def test_destructor(self):
594 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000595 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000596 def __del__(self):
597 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000598 try:
599 f = super().__del__
600 except AttributeError:
601 pass
602 else:
603 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000604 def close(self):
605 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000606 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000607 def flush(self):
608 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000609 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000610 with support.check_warnings(('', ResourceWarning)):
611 f = MyFileIO(support.TESTFN, "wb")
612 f.write(b"xxx")
613 del f
614 support.gc_collect()
615 self.assertEqual(record, [1, 2, 3])
616 with self.open(support.TESTFN, "rb") as f:
617 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000618
619 def _check_base_destructor(self, base):
620 record = []
621 class MyIO(base):
622 def __init__(self):
623 # This exercises the availability of attributes on object
624 # destruction.
625 # (in the C version, close() is called by the tp_dealloc
626 # function, not by __del__)
627 self.on_del = 1
628 self.on_close = 2
629 self.on_flush = 3
630 def __del__(self):
631 record.append(self.on_del)
632 try:
633 f = super().__del__
634 except AttributeError:
635 pass
636 else:
637 f()
638 def close(self):
639 record.append(self.on_close)
640 super().close()
641 def flush(self):
642 record.append(self.on_flush)
643 super().flush()
644 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000645 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000646 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000647 self.assertEqual(record, [1, 2, 3])
648
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000649 def test_IOBase_destructor(self):
650 self._check_base_destructor(self.IOBase)
651
652 def test_RawIOBase_destructor(self):
653 self._check_base_destructor(self.RawIOBase)
654
655 def test_BufferedIOBase_destructor(self):
656 self._check_base_destructor(self.BufferedIOBase)
657
658 def test_TextIOBase_destructor(self):
659 self._check_base_destructor(self.TextIOBase)
660
Guido van Rossum87429772007-04-10 21:06:59 +0000661 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000662 with self.open(support.TESTFN, "wb") as f:
663 f.write(b"xxx")
664 with self.open(support.TESTFN, "rb") as f:
665 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000666
Guido van Rossumd4103952007-04-12 05:44:49 +0000667 def test_array_writes(self):
668 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000669 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000670 def check(f):
671 with f:
672 self.assertEqual(f.write(a), n)
673 f.writelines((a,))
674 check(self.BytesIO())
675 check(self.FileIO(support.TESTFN, "w"))
676 check(self.BufferedWriter(self.MockRawIO()))
677 check(self.BufferedRandom(self.MockRawIO()))
678 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000679
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000680 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000681 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000682 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000683
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000684 def test_read_closed(self):
685 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000686 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000687 with self.open(support.TESTFN, "r") as f:
688 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000689 self.assertEqual(file.read(), "egg\n")
690 file.seek(0)
691 file.close()
692 self.assertRaises(ValueError, file.read)
693
694 def test_no_closefd_with_filename(self):
695 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000696 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000697
698 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000699 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000700 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000701 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000702 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000703 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000704 self.assertEqual(file.buffer.raw.closefd, False)
705
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000706 def test_garbage_collection(self):
707 # FileIO objects are collected, and collecting them flushes
708 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000709 with support.check_warnings(('', ResourceWarning)):
710 f = self.FileIO(support.TESTFN, "wb")
711 f.write(b"abcxxx")
712 f.f = f
713 wr = weakref.ref(f)
714 del f
715 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300716 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000717 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000718 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000719
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000720 def test_unbounded_file(self):
721 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
722 zero = "/dev/zero"
723 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000724 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000725 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000726 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000727 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000728 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000729 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000730 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000731 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000732 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000733 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000734 self.assertRaises(OverflowError, f.read)
735
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200736 def check_flush_error_on_close(self, *args, **kwargs):
737 # Test that the file is closed despite failed flush
738 # and that flush() is called before file closed.
739 f = self.open(*args, **kwargs)
740 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000741 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200742 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200743 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000744 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200745 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600746 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200747 self.assertTrue(closed) # flush() called
748 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200749 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200750
751 def test_flush_error_on_close(self):
752 # raw file
753 # Issue #5700: io.FileIO calls flush() after file closed
754 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
755 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
756 self.check_flush_error_on_close(fd, 'wb', buffering=0)
757 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
758 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
759 os.close(fd)
760 # buffered io
761 self.check_flush_error_on_close(support.TESTFN, 'wb')
762 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
763 self.check_flush_error_on_close(fd, 'wb')
764 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
765 self.check_flush_error_on_close(fd, 'wb', closefd=False)
766 os.close(fd)
767 # text io
768 self.check_flush_error_on_close(support.TESTFN, 'w')
769 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
770 self.check_flush_error_on_close(fd, 'w')
771 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
772 self.check_flush_error_on_close(fd, 'w', closefd=False)
773 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000774
775 def test_multi_close(self):
776 f = self.open(support.TESTFN, "wb", buffering=0)
777 f.close()
778 f.close()
779 f.close()
780 self.assertRaises(ValueError, f.flush)
781
Antoine Pitrou328ec742010-09-14 18:37:24 +0000782 def test_RawIOBase_read(self):
783 # Exercise the default RawIOBase.read() implementation (which calls
784 # readinto() internally).
785 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
786 self.assertEqual(rawio.read(2), b"ab")
787 self.assertEqual(rawio.read(2), b"c")
788 self.assertEqual(rawio.read(2), b"d")
789 self.assertEqual(rawio.read(2), None)
790 self.assertEqual(rawio.read(2), b"ef")
791 self.assertEqual(rawio.read(2), b"g")
792 self.assertEqual(rawio.read(2), None)
793 self.assertEqual(rawio.read(2), b"")
794
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400795 def test_types_have_dict(self):
796 test = (
797 self.IOBase(),
798 self.RawIOBase(),
799 self.TextIOBase(),
800 self.StringIO(),
801 self.BytesIO()
802 )
803 for obj in test:
804 self.assertTrue(hasattr(obj, "__dict__"))
805
Ross Lagerwall59142db2011-10-31 20:34:46 +0200806 def test_opener(self):
807 with self.open(support.TESTFN, "w") as f:
808 f.write("egg\n")
809 fd = os.open(support.TESTFN, os.O_RDONLY)
810 def opener(path, flags):
811 return fd
812 with self.open("non-existent", "r", opener=opener) as f:
813 self.assertEqual(f.read(), "egg\n")
814
Barry Warsaw480e2852016-06-08 17:47:26 -0400815 def test_bad_opener_negative_1(self):
816 # Issue #27066.
817 def badopener(fname, flags):
818 return -1
819 with self.assertRaises(ValueError) as cm:
820 open('non-existent', 'r', opener=badopener)
821 self.assertEqual(str(cm.exception), 'opener returned -1')
822
823 def test_bad_opener_other_negative(self):
824 # Issue #27066.
825 def badopener(fname, flags):
826 return -2
827 with self.assertRaises(ValueError) as cm:
828 open('non-existent', 'r', opener=badopener)
829 self.assertEqual(str(cm.exception), 'opener returned -2')
830
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200831 def test_fileio_closefd(self):
832 # Issue #4841
833 with self.open(__file__, 'rb') as f1, \
834 self.open(__file__, 'rb') as f2:
835 fileio = self.FileIO(f1.fileno(), closefd=False)
836 # .__init__() must not close f1
837 fileio.__init__(f2.fileno(), closefd=False)
838 f1.readline()
839 # .close() must not close f2
840 fileio.close()
841 f2.readline()
842
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300843 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200844 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300845 with self.assertRaises(ValueError):
846 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300847
848 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200849 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300850 with self.assertRaises(ValueError):
851 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300852
Martin Panter6bb91f32016-05-28 00:41:57 +0000853 def test_buffered_readinto_mixin(self):
854 # Test the implementation provided by BufferedIOBase
855 class Stream(self.BufferedIOBase):
856 def read(self, size):
857 return b"12345"
858 read1 = read
859 stream = Stream()
860 for method in ("readinto", "readinto1"):
861 with self.subTest(method):
862 buffer = byteslike(5)
863 self.assertEqual(getattr(stream, method)(buffer), 5)
864 self.assertEqual(bytes(buffer), b"12345")
865
Ethan Furmand62548a2016-06-04 14:38:43 -0700866 def test_fspath_support(self):
867 class PathLike:
868 def __init__(self, path):
869 self.path = path
870
871 def __fspath__(self):
872 return self.path
873
874 def check_path_succeeds(path):
875 with self.open(path, "w") as f:
876 f.write("egg\n")
877
878 with self.open(path, "r") as f:
879 self.assertEqual(f.read(), "egg\n")
880
881 check_path_succeeds(PathLike(support.TESTFN))
882 check_path_succeeds(PathLike(support.TESTFN.encode('utf-8')))
883
884 bad_path = PathLike(TypeError)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700885 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700886 self.open(bad_path, 'w')
887
888 # ensure that refcounting is correct with some error conditions
889 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
890 self.open(PathLike(support.TESTFN), 'rwxa')
891
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200892
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000893class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200894
895 def test_IOBase_finalize(self):
896 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
897 # class which inherits IOBase and an object of this class are caught
898 # in a reference cycle and close() is already in the method cache.
899 class MyIO(self.IOBase):
900 def close(self):
901 pass
902
903 # create an instance to populate the method cache
904 MyIO()
905 obj = MyIO()
906 obj.obj = obj
907 wr = weakref.ref(obj)
908 del MyIO
909 del obj
910 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300911 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000912
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000913class PyIOTest(IOTest):
914 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000915
Guido van Rossuma9e20242007-03-08 00:43:48 +0000916
Gregory P. Smith1bef9072015-04-14 13:24:34 -0700917@support.cpython_only
918class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -0700919
Gregory P. Smith054b0652015-04-14 12:58:05 -0700920 def test_RawIOBase_io_in_pyio_match(self):
921 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +0200922 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
923 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -0700924 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
925
926 def test_RawIOBase_pyio_in_io_match(self):
927 """Test that c RawIOBase class has all pyio RawIOBase methods"""
928 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
929 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
930
931
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000932class CommonBufferedTests:
933 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
934
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000935 def test_detach(self):
936 raw = self.MockRawIO()
937 buf = self.tp(raw)
938 self.assertIs(buf.detach(), raw)
939 self.assertRaises(ValueError, buf.detach)
940
Benjamin Peterson10e76b62014-12-21 20:51:50 -0600941 repr(buf) # Should still work
942
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000943 def test_fileno(self):
944 rawio = self.MockRawIO()
945 bufio = self.tp(rawio)
946
Ezio Melottib3aedd42010-11-20 19:04:17 +0000947 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000948
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000949 def test_invalid_args(self):
950 rawio = self.MockRawIO()
951 bufio = self.tp(rawio)
952 # Invalid whence
953 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200954 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000955
956 def test_override_destructor(self):
957 tp = self.tp
958 record = []
959 class MyBufferedIO(tp):
960 def __del__(self):
961 record.append(1)
962 try:
963 f = super().__del__
964 except AttributeError:
965 pass
966 else:
967 f()
968 def close(self):
969 record.append(2)
970 super().close()
971 def flush(self):
972 record.append(3)
973 super().flush()
974 rawio = self.MockRawIO()
975 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000976 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000977 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +0000978 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000979
980 def test_context_manager(self):
981 # Test usability as a context manager
982 rawio = self.MockRawIO()
983 bufio = self.tp(rawio)
984 def _with():
985 with bufio:
986 pass
987 _with()
988 # bufio should now be closed, and using it a second time should raise
989 # a ValueError.
990 self.assertRaises(ValueError, _with)
991
992 def test_error_through_destructor(self):
993 # Test that the exception state is not modified by a destructor,
994 # even if close() fails.
995 rawio = self.CloseFailureIO()
996 def f():
997 self.tp(rawio).xyzzy
998 with support.captured_output("stderr") as s:
999 self.assertRaises(AttributeError, f)
1000 s = s.getvalue().strip()
1001 if s:
1002 # The destructor *may* have printed an unraisable error, check it
1003 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001004 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001005 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +00001006
Antoine Pitrou716c4442009-05-23 19:04:03 +00001007 def test_repr(self):
1008 raw = self.MockRawIO()
1009 b = self.tp(raw)
Serhiy Storchaka521e5862014-07-22 15:00:37 +03001010 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001011 self.assertEqual(repr(b), "<%s>" % clsname)
1012 raw.name = "dummy"
1013 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
1014 raw.name = b"dummy"
1015 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
1016
Antoine Pitrou6be88762010-05-03 16:48:20 +00001017 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001018 # Test that buffered file is closed despite failed flush
1019 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001020 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001021 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001022 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001023 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001024 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001025 raw.flush = bad_flush
1026 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001027 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001028 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001029 self.assertTrue(raw.closed)
1030 self.assertTrue(closed) # flush() called
1031 self.assertFalse(closed[0]) # flush() called before file closed
1032 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001033 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001034
1035 def test_close_error_on_close(self):
1036 raw = self.MockRawIO()
1037 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001038 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001039 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001040 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001041 raw.close = bad_close
1042 b = self.tp(raw)
1043 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001044 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001045 b.close()
1046 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001047 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001048 self.assertEqual(err.exception.__context__.args, ('flush',))
1049 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001050
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001051 def test_nonnormalized_close_error_on_close(self):
1052 # Issue #21677
1053 raw = self.MockRawIO()
1054 def bad_flush():
1055 raise non_existing_flush
1056 def bad_close():
1057 raise non_existing_close
1058 raw.close = bad_close
1059 b = self.tp(raw)
1060 b.flush = bad_flush
1061 with self.assertRaises(NameError) as err: # exception not swallowed
1062 b.close()
1063 self.assertIn('non_existing_close', str(err.exception))
1064 self.assertIsInstance(err.exception.__context__, NameError)
1065 self.assertIn('non_existing_flush', str(err.exception.__context__))
1066 self.assertFalse(b.closed)
1067
Antoine Pitrou6be88762010-05-03 16:48:20 +00001068 def test_multi_close(self):
1069 raw = self.MockRawIO()
1070 b = self.tp(raw)
1071 b.close()
1072 b.close()
1073 b.close()
1074 self.assertRaises(ValueError, b.flush)
1075
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001076 def test_unseekable(self):
1077 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1078 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1079 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1080
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001081 def test_readonly_attributes(self):
1082 raw = self.MockRawIO()
1083 buf = self.tp(raw)
1084 x = self.MockRawIO()
1085 with self.assertRaises(AttributeError):
1086 buf.raw = x
1087
Guido van Rossum78892e42007-04-06 17:31:18 +00001088
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001089class SizeofTest:
1090
1091 @support.cpython_only
1092 def test_sizeof(self):
1093 bufsize1 = 4096
1094 bufsize2 = 8192
1095 rawio = self.MockRawIO()
1096 bufio = self.tp(rawio, buffer_size=bufsize1)
1097 size = sys.getsizeof(bufio) - bufsize1
1098 rawio = self.MockRawIO()
1099 bufio = self.tp(rawio, buffer_size=bufsize2)
1100 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1101
Jesus Ceadc469452012-10-04 12:37:56 +02001102 @support.cpython_only
1103 def test_buffer_freeing(self) :
1104 bufsize = 4096
1105 rawio = self.MockRawIO()
1106 bufio = self.tp(rawio, buffer_size=bufsize)
1107 size = sys.getsizeof(bufio) - bufsize
1108 bufio.close()
1109 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001110
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001111class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1112 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001113
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001114 def test_constructor(self):
1115 rawio = self.MockRawIO([b"abc"])
1116 bufio = self.tp(rawio)
1117 bufio.__init__(rawio)
1118 bufio.__init__(rawio, buffer_size=1024)
1119 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001120 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001121 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1122 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1123 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1124 rawio = self.MockRawIO([b"abc"])
1125 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001126 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001127
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001128 def test_uninitialized(self):
1129 bufio = self.tp.__new__(self.tp)
1130 del bufio
1131 bufio = self.tp.__new__(self.tp)
1132 self.assertRaisesRegex((ValueError, AttributeError),
1133 'uninitialized|has no attribute',
1134 bufio.read, 0)
1135 bufio.__init__(self.MockRawIO())
1136 self.assertEqual(bufio.read(0), b'')
1137
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001138 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001139 for arg in (None, 7):
1140 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1141 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001142 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001143 # Invalid args
1144 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001145
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001146 def test_read1(self):
1147 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1148 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001149 self.assertEqual(b"a", bufio.read(1))
1150 self.assertEqual(b"b", bufio.read1(1))
1151 self.assertEqual(rawio._reads, 1)
1152 self.assertEqual(b"c", bufio.read1(100))
1153 self.assertEqual(rawio._reads, 1)
1154 self.assertEqual(b"d", bufio.read1(100))
1155 self.assertEqual(rawio._reads, 2)
1156 self.assertEqual(b"efg", bufio.read1(100))
1157 self.assertEqual(rawio._reads, 3)
1158 self.assertEqual(b"", bufio.read1(100))
1159 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001160 # Invalid args
1161 self.assertRaises(ValueError, bufio.read1, -1)
1162
1163 def test_readinto(self):
1164 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1165 bufio = self.tp(rawio)
1166 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001167 self.assertEqual(bufio.readinto(b), 2)
1168 self.assertEqual(b, b"ab")
1169 self.assertEqual(bufio.readinto(b), 2)
1170 self.assertEqual(b, b"cd")
1171 self.assertEqual(bufio.readinto(b), 2)
1172 self.assertEqual(b, b"ef")
1173 self.assertEqual(bufio.readinto(b), 1)
1174 self.assertEqual(b, b"gf")
1175 self.assertEqual(bufio.readinto(b), 0)
1176 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001177 rawio = self.MockRawIO((b"abc", None))
1178 bufio = self.tp(rawio)
1179 self.assertEqual(bufio.readinto(b), 2)
1180 self.assertEqual(b, b"ab")
1181 self.assertEqual(bufio.readinto(b), 1)
1182 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001183
Benjamin Petersona96fea02014-06-22 14:17:44 -07001184 def test_readinto1(self):
1185 buffer_size = 10
1186 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1187 bufio = self.tp(rawio, buffer_size=buffer_size)
1188 b = bytearray(2)
1189 self.assertEqual(bufio.peek(3), b'abc')
1190 self.assertEqual(rawio._reads, 1)
1191 self.assertEqual(bufio.readinto1(b), 2)
1192 self.assertEqual(b, b"ab")
1193 self.assertEqual(rawio._reads, 1)
1194 self.assertEqual(bufio.readinto1(b), 1)
1195 self.assertEqual(b[:1], b"c")
1196 self.assertEqual(rawio._reads, 1)
1197 self.assertEqual(bufio.readinto1(b), 2)
1198 self.assertEqual(b, b"de")
1199 self.assertEqual(rawio._reads, 2)
1200 b = bytearray(2*buffer_size)
1201 self.assertEqual(bufio.peek(3), b'fgh')
1202 self.assertEqual(rawio._reads, 3)
1203 self.assertEqual(bufio.readinto1(b), 6)
1204 self.assertEqual(b[:6], b"fghjkl")
1205 self.assertEqual(rawio._reads, 4)
1206
1207 def test_readinto_array(self):
1208 buffer_size = 60
1209 data = b"a" * 26
1210 rawio = self.MockRawIO((data,))
1211 bufio = self.tp(rawio, buffer_size=buffer_size)
1212
1213 # Create an array with element size > 1 byte
1214 b = array.array('i', b'x' * 32)
1215 assert len(b) != 16
1216
1217 # Read into it. We should get as many *bytes* as we can fit into b
1218 # (which is more than the number of elements)
1219 n = bufio.readinto(b)
1220 self.assertGreater(n, len(b))
1221
1222 # Check that old contents of b are preserved
1223 bm = memoryview(b).cast('B')
1224 self.assertLess(n, len(bm))
1225 self.assertEqual(bm[:n], data[:n])
1226 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1227
1228 def test_readinto1_array(self):
1229 buffer_size = 60
1230 data = b"a" * 26
1231 rawio = self.MockRawIO((data,))
1232 bufio = self.tp(rawio, buffer_size=buffer_size)
1233
1234 # Create an array with element size > 1 byte
1235 b = array.array('i', b'x' * 32)
1236 assert len(b) != 16
1237
1238 # Read into it. We should get as many *bytes* as we can fit into b
1239 # (which is more than the number of elements)
1240 n = bufio.readinto1(b)
1241 self.assertGreater(n, len(b))
1242
1243 # Check that old contents of b are preserved
1244 bm = memoryview(b).cast('B')
1245 self.assertLess(n, len(bm))
1246 self.assertEqual(bm[:n], data[:n])
1247 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1248
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001249 def test_readlines(self):
1250 def bufio():
1251 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1252 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001253 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1254 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1255 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001256
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001257 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001258 data = b"abcdefghi"
1259 dlen = len(data)
1260
1261 tests = [
1262 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1263 [ 100, [ 3, 3, 3], [ dlen ] ],
1264 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1265 ]
1266
1267 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001268 rawio = self.MockFileIO(data)
1269 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001270 pos = 0
1271 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001272 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001273 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001274 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001275 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001276
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001277 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001278 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001279 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1280 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001281 self.assertEqual(b"abcd", bufio.read(6))
1282 self.assertEqual(b"e", bufio.read(1))
1283 self.assertEqual(b"fg", bufio.read())
1284 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001285 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001286 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001287
Victor Stinnera80987f2011-05-25 22:47:16 +02001288 rawio = self.MockRawIO((b"a", None, None))
1289 self.assertEqual(b"a", rawio.readall())
1290 self.assertIsNone(rawio.readall())
1291
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001292 def test_read_past_eof(self):
1293 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1294 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001295
Ezio Melottib3aedd42010-11-20 19:04:17 +00001296 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001297
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001298 def test_read_all(self):
1299 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1300 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001301
Ezio Melottib3aedd42010-11-20 19:04:17 +00001302 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001303
Victor Stinner45df8202010-04-28 22:31:17 +00001304 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001305 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001306 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001307 try:
1308 # Write out many bytes with exactly the same number of 0's,
1309 # 1's... 255's. This will help us check that concurrent reading
1310 # doesn't duplicate or forget contents.
1311 N = 1000
1312 l = list(range(256)) * N
1313 random.shuffle(l)
1314 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001315 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001316 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001317 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001318 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001319 errors = []
1320 results = []
1321 def f():
1322 try:
1323 # Intra-buffer read then buffer-flushing read
1324 for n in cycle([1, 19]):
1325 s = bufio.read(n)
1326 if not s:
1327 break
1328 # list.append() is atomic
1329 results.append(s)
1330 except Exception as e:
1331 errors.append(e)
1332 raise
1333 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001334 with support.start_threads(threads):
1335 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001336 self.assertFalse(errors,
1337 "the following exceptions were caught: %r" % errors)
1338 s = b''.join(results)
1339 for i in range(256):
1340 c = bytes(bytearray([i]))
1341 self.assertEqual(s.count(c), N)
1342 finally:
1343 support.unlink(support.TESTFN)
1344
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001345 def test_unseekable(self):
1346 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1347 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1348 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1349 bufio.read(1)
1350 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1351 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1352
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001353 def test_misbehaved_io(self):
1354 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1355 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001356 self.assertRaises(OSError, bufio.seek, 0)
1357 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001358
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001359 def test_no_extraneous_read(self):
1360 # Issue #9550; when the raw IO object has satisfied the read request,
1361 # we should not issue any additional reads, otherwise it may block
1362 # (e.g. socket).
1363 bufsize = 16
1364 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1365 rawio = self.MockRawIO([b"x" * n])
1366 bufio = self.tp(rawio, bufsize)
1367 self.assertEqual(bufio.read(n), b"x" * n)
1368 # Simple case: one raw read is enough to satisfy the request.
1369 self.assertEqual(rawio._extraneous_reads, 0,
1370 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1371 # A more complex case where two raw reads are needed to satisfy
1372 # the request.
1373 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1374 bufio = self.tp(rawio, bufsize)
1375 self.assertEqual(bufio.read(n), b"x" * n)
1376 self.assertEqual(rawio._extraneous_reads, 0,
1377 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1378
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001379 def test_read_on_closed(self):
1380 # Issue #23796
1381 b = io.BufferedReader(io.BytesIO(b"12"))
1382 b.read(1)
1383 b.close()
1384 self.assertRaises(ValueError, b.peek)
1385 self.assertRaises(ValueError, b.read1, 1)
1386
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001387
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001388class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001389 tp = io.BufferedReader
1390
1391 def test_constructor(self):
1392 BufferedReaderTest.test_constructor(self)
1393 # The allocation can succeed on 32-bit builds, e.g. with more
1394 # than 2GB RAM and a 64-bit kernel.
1395 if sys.maxsize > 0x7FFFFFFF:
1396 rawio = self.MockRawIO()
1397 bufio = self.tp(rawio)
1398 self.assertRaises((OverflowError, MemoryError, ValueError),
1399 bufio.__init__, rawio, sys.maxsize)
1400
1401 def test_initialization(self):
1402 rawio = self.MockRawIO([b"abc"])
1403 bufio = self.tp(rawio)
1404 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1405 self.assertRaises(ValueError, bufio.read)
1406 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1407 self.assertRaises(ValueError, bufio.read)
1408 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1409 self.assertRaises(ValueError, bufio.read)
1410
1411 def test_misbehaved_io_read(self):
1412 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1413 bufio = self.tp(rawio)
1414 # _pyio.BufferedReader seems to implement reading different, so that
1415 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001416 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001417
1418 def test_garbage_collection(self):
1419 # C BufferedReader objects are collected.
1420 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001421 with support.check_warnings(('', ResourceWarning)):
1422 rawio = self.FileIO(support.TESTFN, "w+b")
1423 f = self.tp(rawio)
1424 f.f = f
1425 wr = weakref.ref(f)
1426 del f
1427 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001428 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001429
R David Murray67bfe802013-02-23 21:51:05 -05001430 def test_args_error(self):
1431 # Issue #17275
1432 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1433 self.tp(io.BytesIO(), 1024, 1024, 1024)
1434
1435
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001436class PyBufferedReaderTest(BufferedReaderTest):
1437 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001438
Guido van Rossuma9e20242007-03-08 00:43:48 +00001439
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001440class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1441 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001442
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001443 def test_constructor(self):
1444 rawio = self.MockRawIO()
1445 bufio = self.tp(rawio)
1446 bufio.__init__(rawio)
1447 bufio.__init__(rawio, buffer_size=1024)
1448 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001449 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001450 bufio.flush()
1451 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1452 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1453 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1454 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001455 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001456 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001457 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001458
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001459 def test_uninitialized(self):
1460 bufio = self.tp.__new__(self.tp)
1461 del bufio
1462 bufio = self.tp.__new__(self.tp)
1463 self.assertRaisesRegex((ValueError, AttributeError),
1464 'uninitialized|has no attribute',
1465 bufio.write, b'')
1466 bufio.__init__(self.MockRawIO())
1467 self.assertEqual(bufio.write(b''), 0)
1468
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001469 def test_detach_flush(self):
1470 raw = self.MockRawIO()
1471 buf = self.tp(raw)
1472 buf.write(b"howdy!")
1473 self.assertFalse(raw._write_stack)
1474 buf.detach()
1475 self.assertEqual(raw._write_stack, [b"howdy!"])
1476
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001477 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001478 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001479 writer = self.MockRawIO()
1480 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001481 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001482 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001483 buffer = bytearray(b"def")
1484 bufio.write(buffer)
1485 buffer[:] = b"***" # Overwrite our copy of the data
1486 bufio.flush()
1487 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001488
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001489 def test_write_overflow(self):
1490 writer = self.MockRawIO()
1491 bufio = self.tp(writer, 8)
1492 contents = b"abcdefghijklmnop"
1493 for n in range(0, len(contents), 3):
1494 bufio.write(contents[n:n+3])
1495 flushed = b"".join(writer._write_stack)
1496 # At least (total - 8) bytes were implicitly flushed, perhaps more
1497 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001498 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001499
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001500 def check_writes(self, intermediate_func):
1501 # Lots of writes, test the flushed output is as expected.
1502 contents = bytes(range(256)) * 1000
1503 n = 0
1504 writer = self.MockRawIO()
1505 bufio = self.tp(writer, 13)
1506 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1507 def gen_sizes():
1508 for size in count(1):
1509 for i in range(15):
1510 yield size
1511 sizes = gen_sizes()
1512 while n < len(contents):
1513 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001514 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001515 intermediate_func(bufio)
1516 n += size
1517 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001518 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001519
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001520 def test_writes(self):
1521 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001522
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001523 def test_writes_and_flushes(self):
1524 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001525
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001526 def test_writes_and_seeks(self):
1527 def _seekabs(bufio):
1528 pos = bufio.tell()
1529 bufio.seek(pos + 1, 0)
1530 bufio.seek(pos - 1, 0)
1531 bufio.seek(pos, 0)
1532 self.check_writes(_seekabs)
1533 def _seekrel(bufio):
1534 pos = bufio.seek(0, 1)
1535 bufio.seek(+1, 1)
1536 bufio.seek(-1, 1)
1537 bufio.seek(pos, 0)
1538 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001539
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001540 def test_writes_and_truncates(self):
1541 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001542
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001543 def test_write_non_blocking(self):
1544 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001545 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001546
Ezio Melottib3aedd42010-11-20 19:04:17 +00001547 self.assertEqual(bufio.write(b"abcd"), 4)
1548 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001549 # 1 byte will be written, the rest will be buffered
1550 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001551 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001552
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001553 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1554 raw.block_on(b"0")
1555 try:
1556 bufio.write(b"opqrwxyz0123456789")
1557 except self.BlockingIOError as e:
1558 written = e.characters_written
1559 else:
1560 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001561 self.assertEqual(written, 16)
1562 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001563 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001564
Ezio Melottib3aedd42010-11-20 19:04:17 +00001565 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001566 s = raw.pop_written()
1567 # Previously buffered bytes were flushed
1568 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001569
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001570 def test_write_and_rewind(self):
1571 raw = io.BytesIO()
1572 bufio = self.tp(raw, 4)
1573 self.assertEqual(bufio.write(b"abcdef"), 6)
1574 self.assertEqual(bufio.tell(), 6)
1575 bufio.seek(0, 0)
1576 self.assertEqual(bufio.write(b"XY"), 2)
1577 bufio.seek(6, 0)
1578 self.assertEqual(raw.getvalue(), b"XYcdef")
1579 self.assertEqual(bufio.write(b"123456"), 6)
1580 bufio.flush()
1581 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001582
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001583 def test_flush(self):
1584 writer = self.MockRawIO()
1585 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001586 bufio.write(b"abc")
1587 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001588 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001589
Antoine Pitrou131a4892012-10-16 22:57:11 +02001590 def test_writelines(self):
1591 l = [b'ab', b'cd', b'ef']
1592 writer = self.MockRawIO()
1593 bufio = self.tp(writer, 8)
1594 bufio.writelines(l)
1595 bufio.flush()
1596 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1597
1598 def test_writelines_userlist(self):
1599 l = UserList([b'ab', b'cd', b'ef'])
1600 writer = self.MockRawIO()
1601 bufio = self.tp(writer, 8)
1602 bufio.writelines(l)
1603 bufio.flush()
1604 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1605
1606 def test_writelines_error(self):
1607 writer = self.MockRawIO()
1608 bufio = self.tp(writer, 8)
1609 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1610 self.assertRaises(TypeError, bufio.writelines, None)
1611 self.assertRaises(TypeError, bufio.writelines, 'abc')
1612
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001613 def test_destructor(self):
1614 writer = self.MockRawIO()
1615 bufio = self.tp(writer, 8)
1616 bufio.write(b"abc")
1617 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001618 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001619 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001620
1621 def test_truncate(self):
1622 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001623 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001624 bufio = self.tp(raw, 8)
1625 bufio.write(b"abcdef")
1626 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001627 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001628 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001629 self.assertEqual(f.read(), b"abc")
1630
Victor Stinner45df8202010-04-28 22:31:17 +00001631 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001632 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001633 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001634 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001635 # Write out many bytes from many threads and test they were
1636 # all flushed.
1637 N = 1000
1638 contents = bytes(range(256)) * N
1639 sizes = cycle([1, 19])
1640 n = 0
1641 queue = deque()
1642 while n < len(contents):
1643 size = next(sizes)
1644 queue.append(contents[n:n+size])
1645 n += size
1646 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001647 # We use a real file object because it allows us to
1648 # exercise situations where the GIL is released before
1649 # writing the buffer to the raw streams. This is in addition
1650 # to concurrency issues due to switching threads in the middle
1651 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001652 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001653 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001654 errors = []
1655 def f():
1656 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001657 while True:
1658 try:
1659 s = queue.popleft()
1660 except IndexError:
1661 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001662 bufio.write(s)
1663 except Exception as e:
1664 errors.append(e)
1665 raise
1666 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001667 with support.start_threads(threads):
1668 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001669 self.assertFalse(errors,
1670 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001671 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001672 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001673 s = f.read()
1674 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001675 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001676 finally:
1677 support.unlink(support.TESTFN)
1678
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001679 def test_misbehaved_io(self):
1680 rawio = self.MisbehavedRawIO()
1681 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001682 self.assertRaises(OSError, bufio.seek, 0)
1683 self.assertRaises(OSError, bufio.tell)
1684 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001685
Florent Xicluna109d5732012-07-07 17:03:22 +02001686 def test_max_buffer_size_removal(self):
1687 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001688 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001689
Benjamin Peterson68623612012-12-20 11:53:11 -06001690 def test_write_error_on_close(self):
1691 raw = self.MockRawIO()
1692 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001693 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001694 raw.write = bad_write
1695 b = self.tp(raw)
1696 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001697 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001698 self.assertTrue(b.closed)
1699
Benjamin Peterson59406a92009-03-26 17:10:29 +00001700
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001701class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001702 tp = io.BufferedWriter
1703
1704 def test_constructor(self):
1705 BufferedWriterTest.test_constructor(self)
1706 # The allocation can succeed on 32-bit builds, e.g. with more
1707 # than 2GB RAM and a 64-bit kernel.
1708 if sys.maxsize > 0x7FFFFFFF:
1709 rawio = self.MockRawIO()
1710 bufio = self.tp(rawio)
1711 self.assertRaises((OverflowError, MemoryError, ValueError),
1712 bufio.__init__, rawio, sys.maxsize)
1713
1714 def test_initialization(self):
1715 rawio = self.MockRawIO()
1716 bufio = self.tp(rawio)
1717 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1718 self.assertRaises(ValueError, bufio.write, b"def")
1719 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1720 self.assertRaises(ValueError, bufio.write, b"def")
1721 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1722 self.assertRaises(ValueError, bufio.write, b"def")
1723
1724 def test_garbage_collection(self):
1725 # C BufferedWriter objects are collected, and collecting them flushes
1726 # all data to disk.
1727 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001728 with support.check_warnings(('', ResourceWarning)):
1729 rawio = self.FileIO(support.TESTFN, "w+b")
1730 f = self.tp(rawio)
1731 f.write(b"123xxx")
1732 f.x = f
1733 wr = weakref.ref(f)
1734 del f
1735 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001736 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001737 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001738 self.assertEqual(f.read(), b"123xxx")
1739
R David Murray67bfe802013-02-23 21:51:05 -05001740 def test_args_error(self):
1741 # Issue #17275
1742 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1743 self.tp(io.BytesIO(), 1024, 1024, 1024)
1744
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001745
1746class PyBufferedWriterTest(BufferedWriterTest):
1747 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001748
Guido van Rossum01a27522007-03-07 01:00:12 +00001749class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001750
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001751 def test_constructor(self):
1752 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001753 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001754
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001755 def test_uninitialized(self):
1756 pair = self.tp.__new__(self.tp)
1757 del pair
1758 pair = self.tp.__new__(self.tp)
1759 self.assertRaisesRegex((ValueError, AttributeError),
1760 'uninitialized|has no attribute',
1761 pair.read, 0)
1762 self.assertRaisesRegex((ValueError, AttributeError),
1763 'uninitialized|has no attribute',
1764 pair.write, b'')
1765 pair.__init__(self.MockRawIO(), self.MockRawIO())
1766 self.assertEqual(pair.read(0), b'')
1767 self.assertEqual(pair.write(b''), 0)
1768
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001769 def test_detach(self):
1770 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1771 self.assertRaises(self.UnsupportedOperation, pair.detach)
1772
Florent Xicluna109d5732012-07-07 17:03:22 +02001773 def test_constructor_max_buffer_size_removal(self):
1774 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001775 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001776
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001777 def test_constructor_with_not_readable(self):
1778 class NotReadable(MockRawIO):
1779 def readable(self):
1780 return False
1781
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001782 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001783
1784 def test_constructor_with_not_writeable(self):
1785 class NotWriteable(MockRawIO):
1786 def writable(self):
1787 return False
1788
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001789 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001790
1791 def test_read(self):
1792 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1793
1794 self.assertEqual(pair.read(3), b"abc")
1795 self.assertEqual(pair.read(1), b"d")
1796 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001797 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1798 self.assertEqual(pair.read(None), b"abc")
1799
1800 def test_readlines(self):
1801 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1802 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1803 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1804 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001805
1806 def test_read1(self):
1807 # .read1() is delegated to the underlying reader object, so this test
1808 # can be shallow.
1809 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1810
1811 self.assertEqual(pair.read1(3), b"abc")
1812
1813 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00001814 for method in ("readinto", "readinto1"):
1815 with self.subTest(method):
1816 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001817
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03001818 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00001819 self.assertEqual(getattr(pair, method)(data), 5)
1820 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001821
1822 def test_write(self):
1823 w = self.MockRawIO()
1824 pair = self.tp(self.MockRawIO(), w)
1825
1826 pair.write(b"abc")
1827 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00001828 buffer = bytearray(b"def")
1829 pair.write(buffer)
1830 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001831 pair.flush()
1832 self.assertEqual(w._write_stack, [b"abc", b"def"])
1833
1834 def test_peek(self):
1835 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1836
1837 self.assertTrue(pair.peek(3).startswith(b"abc"))
1838 self.assertEqual(pair.read(3), b"abc")
1839
1840 def test_readable(self):
1841 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1842 self.assertTrue(pair.readable())
1843
1844 def test_writeable(self):
1845 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1846 self.assertTrue(pair.writable())
1847
1848 def test_seekable(self):
1849 # BufferedRWPairs are never seekable, even if their readers and writers
1850 # are.
1851 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1852 self.assertFalse(pair.seekable())
1853
1854 # .flush() is delegated to the underlying writer object and has been
1855 # tested in the test_write method.
1856
1857 def test_close_and_closed(self):
1858 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1859 self.assertFalse(pair.closed)
1860 pair.close()
1861 self.assertTrue(pair.closed)
1862
Serhiy Storchaka7665be62015-03-24 23:21:57 +02001863 def test_reader_close_error_on_close(self):
1864 def reader_close():
1865 reader_non_existing
1866 reader = self.MockRawIO()
1867 reader.close = reader_close
1868 writer = self.MockRawIO()
1869 pair = self.tp(reader, writer)
1870 with self.assertRaises(NameError) as err:
1871 pair.close()
1872 self.assertIn('reader_non_existing', str(err.exception))
1873 self.assertTrue(pair.closed)
1874 self.assertFalse(reader.closed)
1875 self.assertTrue(writer.closed)
1876
1877 def test_writer_close_error_on_close(self):
1878 def writer_close():
1879 writer_non_existing
1880 reader = self.MockRawIO()
1881 writer = self.MockRawIO()
1882 writer.close = writer_close
1883 pair = self.tp(reader, writer)
1884 with self.assertRaises(NameError) as err:
1885 pair.close()
1886 self.assertIn('writer_non_existing', str(err.exception))
1887 self.assertFalse(pair.closed)
1888 self.assertTrue(reader.closed)
1889 self.assertFalse(writer.closed)
1890
1891 def test_reader_writer_close_error_on_close(self):
1892 def reader_close():
1893 reader_non_existing
1894 def writer_close():
1895 writer_non_existing
1896 reader = self.MockRawIO()
1897 reader.close = reader_close
1898 writer = self.MockRawIO()
1899 writer.close = writer_close
1900 pair = self.tp(reader, writer)
1901 with self.assertRaises(NameError) as err:
1902 pair.close()
1903 self.assertIn('reader_non_existing', str(err.exception))
1904 self.assertIsInstance(err.exception.__context__, NameError)
1905 self.assertIn('writer_non_existing', str(err.exception.__context__))
1906 self.assertFalse(pair.closed)
1907 self.assertFalse(reader.closed)
1908 self.assertFalse(writer.closed)
1909
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001910 def test_isatty(self):
1911 class SelectableIsAtty(MockRawIO):
1912 def __init__(self, isatty):
1913 MockRawIO.__init__(self)
1914 self._isatty = isatty
1915
1916 def isatty(self):
1917 return self._isatty
1918
1919 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1920 self.assertFalse(pair.isatty())
1921
1922 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1923 self.assertTrue(pair.isatty())
1924
1925 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1926 self.assertTrue(pair.isatty())
1927
1928 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1929 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001930
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04001931 def test_weakref_clearing(self):
1932 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1933 ref = weakref.ref(brw)
1934 brw = None
1935 ref = None # Shouldn't segfault.
1936
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001937class CBufferedRWPairTest(BufferedRWPairTest):
1938 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001939
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001940class PyBufferedRWPairTest(BufferedRWPairTest):
1941 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001942
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001943
1944class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1945 read_mode = "rb+"
1946 write_mode = "wb+"
1947
1948 def test_constructor(self):
1949 BufferedReaderTest.test_constructor(self)
1950 BufferedWriterTest.test_constructor(self)
1951
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001952 def test_uninitialized(self):
1953 BufferedReaderTest.test_uninitialized(self)
1954 BufferedWriterTest.test_uninitialized(self)
1955
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001956 def test_read_and_write(self):
1957 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001958 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001959
1960 self.assertEqual(b"as", rw.read(2))
1961 rw.write(b"ddd")
1962 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001963 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001964 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001965 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001966
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001967 def test_seek_and_tell(self):
1968 raw = self.BytesIO(b"asdfghjkl")
1969 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001970
Ezio Melottib3aedd42010-11-20 19:04:17 +00001971 self.assertEqual(b"as", rw.read(2))
1972 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001973 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001974 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001975
Antoine Pitroue05565e2011-08-20 14:39:23 +02001976 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001977 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001978 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001979 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001980 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001981 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001982 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001983 self.assertEqual(7, rw.tell())
1984 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001985 rw.flush()
1986 self.assertEqual(b"asdf123fl", raw.getvalue())
1987
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001988 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001989
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001990 def check_flush_and_read(self, read_func):
1991 raw = self.BytesIO(b"abcdefghi")
1992 bufio = self.tp(raw)
1993
Ezio Melottib3aedd42010-11-20 19:04:17 +00001994 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001995 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001996 self.assertEqual(b"ef", read_func(bufio, 2))
1997 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001998 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001999 self.assertEqual(6, bufio.tell())
2000 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002001 raw.seek(0, 0)
2002 raw.write(b"XYZ")
2003 # flush() resets the read buffer
2004 bufio.flush()
2005 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002006 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002007
2008 def test_flush_and_read(self):
2009 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2010
2011 def test_flush_and_readinto(self):
2012 def _readinto(bufio, n=-1):
2013 b = bytearray(n if n >= 0 else 9999)
2014 n = bufio.readinto(b)
2015 return bytes(b[:n])
2016 self.check_flush_and_read(_readinto)
2017
2018 def test_flush_and_peek(self):
2019 def _peek(bufio, n=-1):
2020 # This relies on the fact that the buffer can contain the whole
2021 # raw stream, otherwise peek() can return less.
2022 b = bufio.peek(n)
2023 if n != -1:
2024 b = b[:n]
2025 bufio.seek(len(b), 1)
2026 return b
2027 self.check_flush_and_read(_peek)
2028
2029 def test_flush_and_write(self):
2030 raw = self.BytesIO(b"abcdefghi")
2031 bufio = self.tp(raw)
2032
2033 bufio.write(b"123")
2034 bufio.flush()
2035 bufio.write(b"45")
2036 bufio.flush()
2037 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002038 self.assertEqual(b"12345fghi", raw.getvalue())
2039 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002040
2041 def test_threads(self):
2042 BufferedReaderTest.test_threads(self)
2043 BufferedWriterTest.test_threads(self)
2044
2045 def test_writes_and_peek(self):
2046 def _peek(bufio):
2047 bufio.peek(1)
2048 self.check_writes(_peek)
2049 def _peek(bufio):
2050 pos = bufio.tell()
2051 bufio.seek(-1, 1)
2052 bufio.peek(1)
2053 bufio.seek(pos, 0)
2054 self.check_writes(_peek)
2055
2056 def test_writes_and_reads(self):
2057 def _read(bufio):
2058 bufio.seek(-1, 1)
2059 bufio.read(1)
2060 self.check_writes(_read)
2061
2062 def test_writes_and_read1s(self):
2063 def _read1(bufio):
2064 bufio.seek(-1, 1)
2065 bufio.read1(1)
2066 self.check_writes(_read1)
2067
2068 def test_writes_and_readintos(self):
2069 def _read(bufio):
2070 bufio.seek(-1, 1)
2071 bufio.readinto(bytearray(1))
2072 self.check_writes(_read)
2073
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002074 def test_write_after_readahead(self):
2075 # Issue #6629: writing after the buffer was filled by readahead should
2076 # first rewind the raw stream.
2077 for overwrite_size in [1, 5]:
2078 raw = self.BytesIO(b"A" * 10)
2079 bufio = self.tp(raw, 4)
2080 # Trigger readahead
2081 self.assertEqual(bufio.read(1), b"A")
2082 self.assertEqual(bufio.tell(), 1)
2083 # Overwriting should rewind the raw stream if it needs so
2084 bufio.write(b"B" * overwrite_size)
2085 self.assertEqual(bufio.tell(), overwrite_size + 1)
2086 # If the write size was smaller than the buffer size, flush() and
2087 # check that rewind happens.
2088 bufio.flush()
2089 self.assertEqual(bufio.tell(), overwrite_size + 1)
2090 s = raw.getvalue()
2091 self.assertEqual(s,
2092 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2093
Antoine Pitrou7c404892011-05-13 00:13:33 +02002094 def test_write_rewind_write(self):
2095 # Various combinations of reading / writing / seeking backwards / writing again
2096 def mutate(bufio, pos1, pos2):
2097 assert pos2 >= pos1
2098 # Fill the buffer
2099 bufio.seek(pos1)
2100 bufio.read(pos2 - pos1)
2101 bufio.write(b'\x02')
2102 # This writes earlier than the previous write, but still inside
2103 # the buffer.
2104 bufio.seek(pos1)
2105 bufio.write(b'\x01')
2106
2107 b = b"\x80\x81\x82\x83\x84"
2108 for i in range(0, len(b)):
2109 for j in range(i, len(b)):
2110 raw = self.BytesIO(b)
2111 bufio = self.tp(raw, 100)
2112 mutate(bufio, i, j)
2113 bufio.flush()
2114 expected = bytearray(b)
2115 expected[j] = 2
2116 expected[i] = 1
2117 self.assertEqual(raw.getvalue(), expected,
2118 "failed result for i=%d, j=%d" % (i, j))
2119
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002120 def test_truncate_after_read_or_write(self):
2121 raw = self.BytesIO(b"A" * 10)
2122 bufio = self.tp(raw, 100)
2123 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2124 self.assertEqual(bufio.truncate(), 2)
2125 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2126 self.assertEqual(bufio.truncate(), 4)
2127
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002128 def test_misbehaved_io(self):
2129 BufferedReaderTest.test_misbehaved_io(self)
2130 BufferedWriterTest.test_misbehaved_io(self)
2131
Antoine Pitroue05565e2011-08-20 14:39:23 +02002132 def test_interleaved_read_write(self):
2133 # Test for issue #12213
2134 with self.BytesIO(b'abcdefgh') as raw:
2135 with self.tp(raw, 100) as f:
2136 f.write(b"1")
2137 self.assertEqual(f.read(1), b'b')
2138 f.write(b'2')
2139 self.assertEqual(f.read1(1), b'd')
2140 f.write(b'3')
2141 buf = bytearray(1)
2142 f.readinto(buf)
2143 self.assertEqual(buf, b'f')
2144 f.write(b'4')
2145 self.assertEqual(f.peek(1), b'h')
2146 f.flush()
2147 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2148
2149 with self.BytesIO(b'abc') as raw:
2150 with self.tp(raw, 100) as f:
2151 self.assertEqual(f.read(1), b'a')
2152 f.write(b"2")
2153 self.assertEqual(f.read(1), b'c')
2154 f.flush()
2155 self.assertEqual(raw.getvalue(), b'a2c')
2156
2157 def test_interleaved_readline_write(self):
2158 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2159 with self.tp(raw) as f:
2160 f.write(b'1')
2161 self.assertEqual(f.readline(), b'b\n')
2162 f.write(b'2')
2163 self.assertEqual(f.readline(), b'def\n')
2164 f.write(b'3')
2165 self.assertEqual(f.readline(), b'\n')
2166 f.flush()
2167 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2168
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002169 # You can't construct a BufferedRandom over a non-seekable stream.
2170 test_unseekable = None
2171
R David Murray67bfe802013-02-23 21:51:05 -05002172
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002173class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002174 tp = io.BufferedRandom
2175
2176 def test_constructor(self):
2177 BufferedRandomTest.test_constructor(self)
2178 # The allocation can succeed on 32-bit builds, e.g. with more
2179 # than 2GB RAM and a 64-bit kernel.
2180 if sys.maxsize > 0x7FFFFFFF:
2181 rawio = self.MockRawIO()
2182 bufio = self.tp(rawio)
2183 self.assertRaises((OverflowError, MemoryError, ValueError),
2184 bufio.__init__, rawio, sys.maxsize)
2185
2186 def test_garbage_collection(self):
2187 CBufferedReaderTest.test_garbage_collection(self)
2188 CBufferedWriterTest.test_garbage_collection(self)
2189
R David Murray67bfe802013-02-23 21:51:05 -05002190 def test_args_error(self):
2191 # Issue #17275
2192 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2193 self.tp(io.BytesIO(), 1024, 1024, 1024)
2194
2195
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002196class PyBufferedRandomTest(BufferedRandomTest):
2197 tp = pyio.BufferedRandom
2198
2199
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002200# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2201# properties:
2202# - A single output character can correspond to many bytes of input.
2203# - The number of input bytes to complete the character can be
2204# undetermined until the last input byte is received.
2205# - The number of input bytes can vary depending on previous input.
2206# - A single input byte can correspond to many characters of output.
2207# - The number of output characters can be undetermined until the
2208# last input byte is received.
2209# - The number of output characters can vary depending on previous input.
2210
2211class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2212 """
2213 For testing seek/tell behavior with a stateful, buffering decoder.
2214
2215 Input is a sequence of words. Words may be fixed-length (length set
2216 by input) or variable-length (period-terminated). In variable-length
2217 mode, extra periods are ignored. Possible words are:
2218 - 'i' followed by a number sets the input length, I (maximum 99).
2219 When I is set to 0, words are space-terminated.
2220 - 'o' followed by a number sets the output length, O (maximum 99).
2221 - Any other word is converted into a word followed by a period on
2222 the output. The output word consists of the input word truncated
2223 or padded out with hyphens to make its length equal to O. If O
2224 is 0, the word is output verbatim without truncating or padding.
2225 I and O are initially set to 1. When I changes, any buffered input is
2226 re-scanned according to the new I. EOF also terminates the last word.
2227 """
2228
2229 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002230 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002231 self.reset()
2232
2233 def __repr__(self):
2234 return '<SID %x>' % id(self)
2235
2236 def reset(self):
2237 self.i = 1
2238 self.o = 1
2239 self.buffer = bytearray()
2240
2241 def getstate(self):
2242 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2243 return bytes(self.buffer), i*100 + o
2244
2245 def setstate(self, state):
2246 buffer, io = state
2247 self.buffer = bytearray(buffer)
2248 i, o = divmod(io, 100)
2249 self.i, self.o = i ^ 1, o ^ 1
2250
2251 def decode(self, input, final=False):
2252 output = ''
2253 for b in input:
2254 if self.i == 0: # variable-length, terminated with period
2255 if b == ord('.'):
2256 if self.buffer:
2257 output += self.process_word()
2258 else:
2259 self.buffer.append(b)
2260 else: # fixed-length, terminate after self.i bytes
2261 self.buffer.append(b)
2262 if len(self.buffer) == self.i:
2263 output += self.process_word()
2264 if final and self.buffer: # EOF terminates the last word
2265 output += self.process_word()
2266 return output
2267
2268 def process_word(self):
2269 output = ''
2270 if self.buffer[0] == ord('i'):
2271 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2272 elif self.buffer[0] == ord('o'):
2273 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2274 else:
2275 output = self.buffer.decode('ascii')
2276 if len(output) < self.o:
2277 output += '-'*self.o # pad out with hyphens
2278 if self.o:
2279 output = output[:self.o] # truncate to output length
2280 output += '.'
2281 self.buffer = bytearray()
2282 return output
2283
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002284 codecEnabled = False
2285
2286 @classmethod
2287 def lookupTestDecoder(cls, name):
2288 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002289 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002290 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002291 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002292 incrementalencoder=None,
2293 streamreader=None, streamwriter=None,
2294 incrementaldecoder=cls)
2295
2296# Register the previous decoder for testing.
2297# Disabled by default, tests will enable it.
2298codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2299
2300
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002301class StatefulIncrementalDecoderTest(unittest.TestCase):
2302 """
2303 Make sure the StatefulIncrementalDecoder actually works.
2304 """
2305
2306 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002307 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002308 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002309 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002310 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002311 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002312 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002313 # I=0, O=6 (variable-length input, fixed-length output)
2314 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2315 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002316 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002317 # I=6, O=3 (fixed-length input > fixed-length output)
2318 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2319 # I=0, then 3; O=29, then 15 (with longer output)
2320 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2321 'a----------------------------.' +
2322 'b----------------------------.' +
2323 'cde--------------------------.' +
2324 'abcdefghijabcde.' +
2325 'a.b------------.' +
2326 '.c.------------.' +
2327 'd.e------------.' +
2328 'k--------------.' +
2329 'l--------------.' +
2330 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002331 ]
2332
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002333 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002334 # Try a few one-shot test cases.
2335 for input, eof, output in self.test_cases:
2336 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002337 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002338
2339 # Also test an unfinished decode, followed by forcing EOF.
2340 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002341 self.assertEqual(d.decode(b'oiabcd'), '')
2342 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002343
2344class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002345
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002346 def setUp(self):
2347 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2348 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002349 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002350
Guido van Rossumd0712812007-04-11 16:32:43 +00002351 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002352 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002353
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002354 def test_constructor(self):
2355 r = self.BytesIO(b"\xc3\xa9\n\n")
2356 b = self.BufferedReader(r, 1000)
2357 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002358 t.__init__(b, encoding="latin-1", newline="\r\n")
2359 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002360 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002361 t.__init__(b, encoding="utf-8", line_buffering=True)
2362 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002363 self.assertEqual(t.line_buffering, True)
2364 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002365 self.assertRaises(TypeError, t.__init__, b, newline=42)
2366 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2367
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002368 def test_uninitialized(self):
2369 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2370 del t
2371 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2372 self.assertRaises(Exception, repr, t)
2373 self.assertRaisesRegex((ValueError, AttributeError),
2374 'uninitialized|has no attribute',
2375 t.read, 0)
2376 t.__init__(self.MockRawIO())
2377 self.assertEqual(t.read(0), '')
2378
Nick Coghlana9b15242014-02-04 22:11:18 +10002379 def test_non_text_encoding_codecs_are_rejected(self):
2380 # Ensure the constructor complains if passed a codec that isn't
2381 # marked as a text encoding
2382 # http://bugs.python.org/issue20404
2383 r = self.BytesIO()
2384 b = self.BufferedWriter(r)
2385 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2386 self.TextIOWrapper(b, encoding="hex")
2387
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002388 def test_detach(self):
2389 r = self.BytesIO()
2390 b = self.BufferedWriter(r)
2391 t = self.TextIOWrapper(b)
2392 self.assertIs(t.detach(), b)
2393
2394 t = self.TextIOWrapper(b, encoding="ascii")
2395 t.write("howdy")
2396 self.assertFalse(r.getvalue())
2397 t.detach()
2398 self.assertEqual(r.getvalue(), b"howdy")
2399 self.assertRaises(ValueError, t.detach)
2400
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002401 # Operations independent of the detached stream should still work
2402 repr(t)
2403 self.assertEqual(t.encoding, "ascii")
2404 self.assertEqual(t.errors, "strict")
2405 self.assertFalse(t.line_buffering)
2406
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002407 def test_repr(self):
2408 raw = self.BytesIO("hello".encode("utf-8"))
2409 b = self.BufferedReader(raw)
2410 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002411 modname = self.TextIOWrapper.__module__
2412 self.assertEqual(repr(t),
2413 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2414 raw.name = "dummy"
2415 self.assertEqual(repr(t),
2416 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002417 t.mode = "r"
2418 self.assertEqual(repr(t),
2419 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002420 raw.name = b"dummy"
2421 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002422 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002423
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002424 t.buffer.detach()
2425 repr(t) # Should not raise an exception
2426
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002427 def test_line_buffering(self):
2428 r = self.BytesIO()
2429 b = self.BufferedWriter(r, 1000)
2430 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002431 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002432 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002433 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002434 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002435 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002436 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002437
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002438 def test_default_encoding(self):
2439 old_environ = dict(os.environ)
2440 try:
2441 # try to get a user preferred encoding different than the current
2442 # locale encoding to check that TextIOWrapper() uses the current
2443 # locale encoding and not the user preferred encoding
2444 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2445 if key in os.environ:
2446 del os.environ[key]
2447
2448 current_locale_encoding = locale.getpreferredencoding(False)
2449 b = self.BytesIO()
2450 t = self.TextIOWrapper(b)
2451 self.assertEqual(t.encoding, current_locale_encoding)
2452 finally:
2453 os.environ.clear()
2454 os.environ.update(old_environ)
2455
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002456 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002457 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002458 # Issue 15989
2459 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002460 b = self.BytesIO()
2461 b.fileno = lambda: _testcapi.INT_MAX + 1
2462 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2463 b.fileno = lambda: _testcapi.UINT_MAX + 1
2464 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2465
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002466 def test_encoding(self):
2467 # Check the encoding attribute is always set, and valid
2468 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002469 t = self.TextIOWrapper(b, encoding="utf-8")
2470 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002471 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002472 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002473 codecs.lookup(t.encoding)
2474
2475 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002476 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002477 b = self.BytesIO(b"abc\n\xff\n")
2478 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002479 self.assertRaises(UnicodeError, t.read)
2480 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002481 b = self.BytesIO(b"abc\n\xff\n")
2482 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002483 self.assertRaises(UnicodeError, t.read)
2484 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002485 b = self.BytesIO(b"abc\n\xff\n")
2486 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002487 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002488 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002489 b = self.BytesIO(b"abc\n\xff\n")
2490 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002491 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002492
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002493 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002494 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002495 b = self.BytesIO()
2496 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002497 self.assertRaises(UnicodeError, t.write, "\xff")
2498 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002499 b = self.BytesIO()
2500 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002501 self.assertRaises(UnicodeError, t.write, "\xff")
2502 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002503 b = self.BytesIO()
2504 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002505 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002506 t.write("abc\xffdef\n")
2507 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002508 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002509 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002510 b = self.BytesIO()
2511 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002512 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002513 t.write("abc\xffdef\n")
2514 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002515 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002516
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002517 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002518 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2519
2520 tests = [
2521 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002522 [ '', input_lines ],
2523 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2524 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2525 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002526 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002527 encodings = (
2528 'utf-8', 'latin-1',
2529 'utf-16', 'utf-16-le', 'utf-16-be',
2530 'utf-32', 'utf-32-le', 'utf-32-be',
2531 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002532
Guido van Rossum8358db22007-08-18 21:39:55 +00002533 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002534 # character in TextIOWrapper._pending_line.
2535 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002536 # XXX: str.encode() should return bytes
2537 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002538 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002539 for bufsize in range(1, 10):
2540 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002541 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2542 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002543 encoding=encoding)
2544 if do_reads:
2545 got_lines = []
2546 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002547 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002548 if c2 == '':
2549 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002550 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002551 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002552 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002553 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002554
2555 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002556 self.assertEqual(got_line, exp_line)
2557 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002558
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002559 def test_newlines_input(self):
2560 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002561 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2562 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002563 (None, normalized.decode("ascii").splitlines(keepends=True)),
2564 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002565 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2566 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2567 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002568 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002569 buf = self.BytesIO(testdata)
2570 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002571 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002572 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002573 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002574
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002575 def test_newlines_output(self):
2576 testdict = {
2577 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2578 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2579 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2580 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2581 }
2582 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2583 for newline, expected in tests:
2584 buf = self.BytesIO()
2585 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2586 txt.write("AAA\nB")
2587 txt.write("BB\nCCC\n")
2588 txt.write("X\rY\r\nZ")
2589 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002590 self.assertEqual(buf.closed, False)
2591 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002592
2593 def test_destructor(self):
2594 l = []
2595 base = self.BytesIO
2596 class MyBytesIO(base):
2597 def close(self):
2598 l.append(self.getvalue())
2599 base.close(self)
2600 b = MyBytesIO()
2601 t = self.TextIOWrapper(b, encoding="ascii")
2602 t.write("abc")
2603 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002604 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002605 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002606
2607 def test_override_destructor(self):
2608 record = []
2609 class MyTextIO(self.TextIOWrapper):
2610 def __del__(self):
2611 record.append(1)
2612 try:
2613 f = super().__del__
2614 except AttributeError:
2615 pass
2616 else:
2617 f()
2618 def close(self):
2619 record.append(2)
2620 super().close()
2621 def flush(self):
2622 record.append(3)
2623 super().flush()
2624 b = self.BytesIO()
2625 t = MyTextIO(b, encoding="ascii")
2626 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002627 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002628 self.assertEqual(record, [1, 2, 3])
2629
2630 def test_error_through_destructor(self):
2631 # Test that the exception state is not modified by a destructor,
2632 # even if close() fails.
2633 rawio = self.CloseFailureIO()
2634 def f():
2635 self.TextIOWrapper(rawio).xyzzy
2636 with support.captured_output("stderr") as s:
2637 self.assertRaises(AttributeError, f)
2638 s = s.getvalue().strip()
2639 if s:
2640 # The destructor *may* have printed an unraisable error, check it
2641 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002642 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002643 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002644
Guido van Rossum9b76da62007-04-11 01:09:03 +00002645 # Systematic tests of the text I/O API
2646
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002647 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002648 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002649 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002650 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002651 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002652 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002653 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002654 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002655 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002656 self.assertEqual(f.tell(), 0)
2657 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002658 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002659 self.assertEqual(f.seek(0), 0)
2660 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002661 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002662 self.assertEqual(f.read(2), "ab")
2663 self.assertEqual(f.read(1), "c")
2664 self.assertEqual(f.read(1), "")
2665 self.assertEqual(f.read(), "")
2666 self.assertEqual(f.tell(), cookie)
2667 self.assertEqual(f.seek(0), 0)
2668 self.assertEqual(f.seek(0, 2), cookie)
2669 self.assertEqual(f.write("def"), 3)
2670 self.assertEqual(f.seek(cookie), cookie)
2671 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002672 if enc.startswith("utf"):
2673 self.multi_line_test(f, enc)
2674 f.close()
2675
2676 def multi_line_test(self, f, enc):
2677 f.seek(0)
2678 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002679 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002680 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002681 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002682 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002683 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002684 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002685 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002686 wlines.append((f.tell(), line))
2687 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002688 f.seek(0)
2689 rlines = []
2690 while True:
2691 pos = f.tell()
2692 line = f.readline()
2693 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002694 break
2695 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002696 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002697
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002698 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002699 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002700 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002701 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002702 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002703 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002704 p2 = f.tell()
2705 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002706 self.assertEqual(f.tell(), p0)
2707 self.assertEqual(f.readline(), "\xff\n")
2708 self.assertEqual(f.tell(), p1)
2709 self.assertEqual(f.readline(), "\xff\n")
2710 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002711 f.seek(0)
2712 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002713 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002714 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002715 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002716 f.close()
2717
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002718 def test_seeking(self):
2719 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002720 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002721 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002722 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002723 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002724 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002725 suffix = bytes(u_suffix.encode("utf-8"))
2726 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002727 with self.open(support.TESTFN, "wb") as f:
2728 f.write(line*2)
2729 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2730 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002731 self.assertEqual(s, str(prefix, "ascii"))
2732 self.assertEqual(f.tell(), prefix_size)
2733 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002734
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002735 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002736 # Regression test for a specific bug
2737 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002738 with self.open(support.TESTFN, "wb") as f:
2739 f.write(data)
2740 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2741 f._CHUNK_SIZE # Just test that it exists
2742 f._CHUNK_SIZE = 2
2743 f.readline()
2744 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002745
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002746 def test_seek_and_tell(self):
2747 #Test seek/tell using the StatefulIncrementalDecoder.
2748 # Make test faster by doing smaller seeks
2749 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002750
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002751 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002752 """Tell/seek to various points within a data stream and ensure
2753 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002754 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002755 f.write(data)
2756 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002757 f = self.open(support.TESTFN, encoding='test_decoder')
2758 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002759 decoded = f.read()
2760 f.close()
2761
Neal Norwitze2b07052008-03-18 19:52:05 +00002762 for i in range(min_pos, len(decoded) + 1): # seek positions
2763 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002764 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002765 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002766 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002767 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002768 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002769 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002770 f.close()
2771
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002772 # Enable the test decoder.
2773 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002774
2775 # Run the tests.
2776 try:
2777 # Try each test case.
2778 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002779 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002780
2781 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002782 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2783 offset = CHUNK_SIZE - len(input)//2
2784 prefix = b'.'*offset
2785 # Don't bother seeking into the prefix (takes too long).
2786 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002787 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002788
2789 # Ensure our test decoder won't interfere with subsequent tests.
2790 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002791 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002792
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002793 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002794 data = "1234567890"
2795 tests = ("utf-16",
2796 "utf-16-le",
2797 "utf-16-be",
2798 "utf-32",
2799 "utf-32-le",
2800 "utf-32-be")
2801 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002802 buf = self.BytesIO()
2803 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002804 # Check if the BOM is written only once (see issue1753).
2805 f.write(data)
2806 f.write(data)
2807 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002808 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002809 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002810 self.assertEqual(f.read(), data * 2)
2811 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002812
Benjamin Petersona1b49012009-03-31 23:11:32 +00002813 def test_unreadable(self):
2814 class UnReadable(self.BytesIO):
2815 def readable(self):
2816 return False
2817 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002818 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002819
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002820 def test_read_one_by_one(self):
2821 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002822 reads = ""
2823 while True:
2824 c = txt.read(1)
2825 if not c:
2826 break
2827 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002828 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002829
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002830 def test_readlines(self):
2831 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2832 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2833 txt.seek(0)
2834 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2835 txt.seek(0)
2836 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2837
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002838 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002839 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002840 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002841 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002842 reads = ""
2843 while True:
2844 c = txt.read(128)
2845 if not c:
2846 break
2847 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002848 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002849
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002850 def test_writelines(self):
2851 l = ['ab', 'cd', 'ef']
2852 buf = self.BytesIO()
2853 txt = self.TextIOWrapper(buf)
2854 txt.writelines(l)
2855 txt.flush()
2856 self.assertEqual(buf.getvalue(), b'abcdef')
2857
2858 def test_writelines_userlist(self):
2859 l = UserList(['ab', 'cd', 'ef'])
2860 buf = self.BytesIO()
2861 txt = self.TextIOWrapper(buf)
2862 txt.writelines(l)
2863 txt.flush()
2864 self.assertEqual(buf.getvalue(), b'abcdef')
2865
2866 def test_writelines_error(self):
2867 txt = self.TextIOWrapper(self.BytesIO())
2868 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2869 self.assertRaises(TypeError, txt.writelines, None)
2870 self.assertRaises(TypeError, txt.writelines, b'abc')
2871
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002872 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002873 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002874
2875 # read one char at a time
2876 reads = ""
2877 while True:
2878 c = txt.read(1)
2879 if not c:
2880 break
2881 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002882 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002883
2884 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002885 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002886 txt._CHUNK_SIZE = 4
2887
2888 reads = ""
2889 while True:
2890 c = txt.read(4)
2891 if not c:
2892 break
2893 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002894 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002895
2896 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002897 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002898 txt._CHUNK_SIZE = 4
2899
2900 reads = txt.read(4)
2901 reads += txt.read(4)
2902 reads += txt.readline()
2903 reads += txt.readline()
2904 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002905 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002906
2907 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002908 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002909 txt._CHUNK_SIZE = 4
2910
2911 reads = txt.read(4)
2912 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002913 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002914
2915 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002916 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002917 txt._CHUNK_SIZE = 4
2918
2919 reads = txt.read(4)
2920 pos = txt.tell()
2921 txt.seek(0)
2922 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002923 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002924
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002925 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002926 buffer = self.BytesIO(self.testdata)
2927 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002928
2929 self.assertEqual(buffer.seekable(), txt.seekable())
2930
Antoine Pitroue4501852009-05-14 18:55:55 +00002931 def test_append_bom(self):
2932 # The BOM is not written again when appending to a non-empty file
2933 filename = support.TESTFN
2934 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2935 with self.open(filename, 'w', encoding=charset) as f:
2936 f.write('aaa')
2937 pos = f.tell()
2938 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002939 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002940
2941 with self.open(filename, 'a', encoding=charset) as f:
2942 f.write('xxx')
2943 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002944 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002945
2946 def test_seek_bom(self):
2947 # Same test, but when seeking manually
2948 filename = support.TESTFN
2949 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2950 with self.open(filename, 'w', encoding=charset) as f:
2951 f.write('aaa')
2952 pos = f.tell()
2953 with self.open(filename, 'r+', encoding=charset) as f:
2954 f.seek(pos)
2955 f.write('zzz')
2956 f.seek(0)
2957 f.write('bbb')
2958 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002959 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002960
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02002961 def test_seek_append_bom(self):
2962 # Same test, but first seek to the start and then to the end
2963 filename = support.TESTFN
2964 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2965 with self.open(filename, 'w', encoding=charset) as f:
2966 f.write('aaa')
2967 with self.open(filename, 'a', encoding=charset) as f:
2968 f.seek(0)
2969 f.seek(0, self.SEEK_END)
2970 f.write('xxx')
2971 with self.open(filename, 'rb') as f:
2972 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
2973
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002974 def test_errors_property(self):
2975 with self.open(support.TESTFN, "w") as f:
2976 self.assertEqual(f.errors, "strict")
2977 with self.open(support.TESTFN, "w", errors="replace") as f:
2978 self.assertEqual(f.errors, "replace")
2979
Brett Cannon31f59292011-02-21 19:29:56 +00002980 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002981 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002982 def test_threads_write(self):
2983 # Issue6750: concurrent writes could duplicate data
2984 event = threading.Event()
2985 with self.open(support.TESTFN, "w", buffering=1) as f:
2986 def run(n):
2987 text = "Thread%03d\n" % n
2988 event.wait()
2989 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03002990 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002991 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03002992 with support.start_threads(threads, event.set):
2993 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002994 with self.open(support.TESTFN) as f:
2995 content = f.read()
2996 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002997 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002998
Antoine Pitrou6be88762010-05-03 16:48:20 +00002999 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003000 # Test that text file is closed despite failed flush
3001 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003002 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003003 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003004 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003005 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003006 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003007 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003008 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003009 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003010 self.assertTrue(txt.buffer.closed)
3011 self.assertTrue(closed) # flush() called
3012 self.assertFalse(closed[0]) # flush() called before file closed
3013 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003014 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003015
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003016 def test_close_error_on_close(self):
3017 buffer = self.BytesIO(self.testdata)
3018 def bad_flush():
3019 raise OSError('flush')
3020 def bad_close():
3021 raise OSError('close')
3022 buffer.close = bad_close
3023 txt = self.TextIOWrapper(buffer, encoding="ascii")
3024 txt.flush = bad_flush
3025 with self.assertRaises(OSError) as err: # exception not swallowed
3026 txt.close()
3027 self.assertEqual(err.exception.args, ('close',))
3028 self.assertIsInstance(err.exception.__context__, OSError)
3029 self.assertEqual(err.exception.__context__.args, ('flush',))
3030 self.assertFalse(txt.closed)
3031
3032 def test_nonnormalized_close_error_on_close(self):
3033 # Issue #21677
3034 buffer = self.BytesIO(self.testdata)
3035 def bad_flush():
3036 raise non_existing_flush
3037 def bad_close():
3038 raise non_existing_close
3039 buffer.close = bad_close
3040 txt = self.TextIOWrapper(buffer, encoding="ascii")
3041 txt.flush = bad_flush
3042 with self.assertRaises(NameError) as err: # exception not swallowed
3043 txt.close()
3044 self.assertIn('non_existing_close', str(err.exception))
3045 self.assertIsInstance(err.exception.__context__, NameError)
3046 self.assertIn('non_existing_flush', str(err.exception.__context__))
3047 self.assertFalse(txt.closed)
3048
Antoine Pitrou6be88762010-05-03 16:48:20 +00003049 def test_multi_close(self):
3050 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3051 txt.close()
3052 txt.close()
3053 txt.close()
3054 self.assertRaises(ValueError, txt.flush)
3055
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003056 def test_unseekable(self):
3057 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3058 self.assertRaises(self.UnsupportedOperation, txt.tell)
3059 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3060
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003061 def test_readonly_attributes(self):
3062 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3063 buf = self.BytesIO(self.testdata)
3064 with self.assertRaises(AttributeError):
3065 txt.buffer = buf
3066
Antoine Pitroue96ec682011-07-23 21:46:35 +02003067 def test_rawio(self):
3068 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3069 # that subprocess.Popen() can have the required unbuffered
3070 # semantics with universal_newlines=True.
3071 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3072 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3073 # Reads
3074 self.assertEqual(txt.read(4), 'abcd')
3075 self.assertEqual(txt.readline(), 'efghi\n')
3076 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3077
3078 def test_rawio_write_through(self):
3079 # Issue #12591: with write_through=True, writes don't need a flush
3080 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3081 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3082 write_through=True)
3083 txt.write('1')
3084 txt.write('23\n4')
3085 txt.write('5')
3086 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3087
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003088 def test_bufio_write_through(self):
3089 # Issue #21396: write_through=True doesn't force a flush()
3090 # on the underlying binary buffered object.
3091 flush_called, write_called = [], []
3092 class BufferedWriter(self.BufferedWriter):
3093 def flush(self, *args, **kwargs):
3094 flush_called.append(True)
3095 return super().flush(*args, **kwargs)
3096 def write(self, *args, **kwargs):
3097 write_called.append(True)
3098 return super().write(*args, **kwargs)
3099
3100 rawio = self.BytesIO()
3101 data = b"a"
3102 bufio = BufferedWriter(rawio, len(data)*2)
3103 textio = self.TextIOWrapper(bufio, encoding='ascii',
3104 write_through=True)
3105 # write to the buffered io but don't overflow the buffer
3106 text = data.decode('ascii')
3107 textio.write(text)
3108
3109 # buffer.flush is not called with write_through=True
3110 self.assertFalse(flush_called)
3111 # buffer.write *is* called with write_through=True
3112 self.assertTrue(write_called)
3113 self.assertEqual(rawio.getvalue(), b"") # no flush
3114
3115 write_called = [] # reset
3116 textio.write(text * 10) # total content is larger than bufio buffer
3117 self.assertTrue(write_called)
3118 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3119
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003120 def test_read_nonbytes(self):
3121 # Issue #17106
3122 # Crash when underlying read() returns non-bytes
3123 t = self.TextIOWrapper(self.StringIO('a'))
3124 self.assertRaises(TypeError, t.read, 1)
3125 t = self.TextIOWrapper(self.StringIO('a'))
3126 self.assertRaises(TypeError, t.readline)
3127 t = self.TextIOWrapper(self.StringIO('a'))
3128 self.assertRaises(TypeError, t.read)
3129
3130 def test_illegal_decoder(self):
3131 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003132 # Bypass the early encoding check added in issue 20404
3133 def _make_illegal_wrapper():
3134 quopri = codecs.lookup("quopri")
3135 quopri._is_text_encoding = True
3136 try:
3137 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3138 newline='\n', encoding="quopri")
3139 finally:
3140 quopri._is_text_encoding = False
3141 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003142 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003143 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003144 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003145 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003146 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003147 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003148 self.assertRaises(TypeError, t.read)
3149
Antoine Pitrou712cb732013-12-21 15:51:54 +01003150 def _check_create_at_shutdown(self, **kwargs):
3151 # Issue #20037: creating a TextIOWrapper at shutdown
3152 # shouldn't crash the interpreter.
3153 iomod = self.io.__name__
3154 code = """if 1:
3155 import codecs
3156 import {iomod} as io
3157
3158 # Avoid looking up codecs at shutdown
3159 codecs.lookup('utf-8')
3160
3161 class C:
3162 def __init__(self):
3163 self.buf = io.BytesIO()
3164 def __del__(self):
3165 io.TextIOWrapper(self.buf, **{kwargs})
3166 print("ok")
3167 c = C()
3168 """.format(iomod=iomod, kwargs=kwargs)
3169 return assert_python_ok("-c", code)
3170
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003171 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003172 def test_create_at_shutdown_without_encoding(self):
3173 rc, out, err = self._check_create_at_shutdown()
3174 if err:
3175 # Can error out with a RuntimeError if the module state
3176 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003177 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003178 else:
3179 self.assertEqual("ok", out.decode().strip())
3180
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003181 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003182 def test_create_at_shutdown_with_encoding(self):
3183 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3184 errors='strict')
3185 self.assertFalse(err)
3186 self.assertEqual("ok", out.decode().strip())
3187
Antoine Pitroub8503892014-04-29 10:14:02 +02003188 def test_read_byteslike(self):
3189 r = MemviewBytesIO(b'Just some random string\n')
3190 t = self.TextIOWrapper(r, 'utf-8')
3191
3192 # TextIOwrapper will not read the full string, because
3193 # we truncate it to a multiple of the native int size
3194 # so that we can construct a more complex memoryview.
3195 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3196
3197 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3198
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003199 def test_issue22849(self):
3200 class F(object):
3201 def readable(self): return True
3202 def writable(self): return True
3203 def seekable(self): return True
3204
3205 for i in range(10):
3206 try:
3207 self.TextIOWrapper(F(), encoding='utf-8')
3208 except Exception:
3209 pass
3210
3211 F.tell = lambda x: 0
3212 t = self.TextIOWrapper(F(), encoding='utf-8')
3213
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003214
Antoine Pitroub8503892014-04-29 10:14:02 +02003215class MemviewBytesIO(io.BytesIO):
3216 '''A BytesIO object whose read method returns memoryviews
3217 rather than bytes'''
3218
3219 def read1(self, len_):
3220 return _to_memoryview(super().read1(len_))
3221
3222 def read(self, len_):
3223 return _to_memoryview(super().read(len_))
3224
3225def _to_memoryview(buf):
3226 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3227
3228 arr = array.array('i')
3229 idx = len(buf) - len(buf) % arr.itemsize
3230 arr.frombytes(buf[:idx])
3231 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003232
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003233
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003234class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003235 io = io
Steve Dowerfe8f4c92016-09-07 09:31:52 -07003236 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003237
3238 def test_initialization(self):
3239 r = self.BytesIO(b"\xc3\xa9\n\n")
3240 b = self.BufferedReader(r, 1000)
3241 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003242 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3243 self.assertRaises(ValueError, t.read)
3244
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003245 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3246 self.assertRaises(Exception, repr, t)
3247
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003248 def test_garbage_collection(self):
3249 # C TextIOWrapper objects are collected, and collecting them flushes
3250 # all data to disk.
3251 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003252 with support.check_warnings(('', ResourceWarning)):
3253 rawio = io.FileIO(support.TESTFN, "wb")
3254 b = self.BufferedWriter(rawio)
3255 t = self.TextIOWrapper(b, encoding="ascii")
3256 t.write("456def")
3257 t.x = t
3258 wr = weakref.ref(t)
3259 del t
3260 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003261 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003262 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003263 self.assertEqual(f.read(), b"456def")
3264
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003265 def test_rwpair_cleared_before_textio(self):
3266 # Issue 13070: TextIOWrapper's finalization would crash when called
3267 # after the reference to the underlying BufferedRWPair's writer got
3268 # cleared by the GC.
3269 for i in range(1000):
3270 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3271 t1 = self.TextIOWrapper(b1, encoding="ascii")
3272 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3273 t2 = self.TextIOWrapper(b2, encoding="ascii")
3274 # circular references
3275 t1.buddy = t2
3276 t2.buddy = t1
3277 support.gc_collect()
3278
3279
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003280class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003281 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003282 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003283
3284
3285class IncrementalNewlineDecoderTest(unittest.TestCase):
3286
3287 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003288 # UTF-8 specific tests for a newline decoder
3289 def _check_decode(b, s, **kwargs):
3290 # We exercise getstate() / setstate() as well as decode()
3291 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003292 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003293 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003294 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003295
Antoine Pitrou180a3362008-12-14 16:36:46 +00003296 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003297
Antoine Pitrou180a3362008-12-14 16:36:46 +00003298 _check_decode(b'\xe8', "")
3299 _check_decode(b'\xa2', "")
3300 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003301
Antoine Pitrou180a3362008-12-14 16:36:46 +00003302 _check_decode(b'\xe8', "")
3303 _check_decode(b'\xa2', "")
3304 _check_decode(b'\x88', "\u8888")
3305
3306 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003307 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3308
Antoine Pitrou180a3362008-12-14 16:36:46 +00003309 decoder.reset()
3310 _check_decode(b'\n', "\n")
3311 _check_decode(b'\r', "")
3312 _check_decode(b'', "\n", final=True)
3313 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003314
Antoine Pitrou180a3362008-12-14 16:36:46 +00003315 _check_decode(b'\r', "")
3316 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003317
Antoine Pitrou180a3362008-12-14 16:36:46 +00003318 _check_decode(b'\r\r\n', "\n\n")
3319 _check_decode(b'\r', "")
3320 _check_decode(b'\r', "\n")
3321 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003322
Antoine Pitrou180a3362008-12-14 16:36:46 +00003323 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3324 _check_decode(b'\xe8\xa2\x88', "\u8888")
3325 _check_decode(b'\n', "\n")
3326 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3327 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003328
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003329 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003330 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003331 if encoding is not None:
3332 encoder = codecs.getincrementalencoder(encoding)()
3333 def _decode_bytewise(s):
3334 # Decode one byte at a time
3335 for b in encoder.encode(s):
3336 result.append(decoder.decode(bytes([b])))
3337 else:
3338 encoder = None
3339 def _decode_bytewise(s):
3340 # Decode one char at a time
3341 for c in s:
3342 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003343 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003344 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003345 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003346 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003347 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003348 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003349 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003350 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003351 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003352 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003353 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003354 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003355 input = "abc"
3356 if encoder is not None:
3357 encoder.reset()
3358 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003359 self.assertEqual(decoder.decode(input), "abc")
3360 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003361
3362 def test_newline_decoder(self):
3363 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003364 # None meaning the IncrementalNewlineDecoder takes unicode input
3365 # rather than bytes input
3366 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003367 'utf-16', 'utf-16-le', 'utf-16-be',
3368 'utf-32', 'utf-32-le', 'utf-32-be',
3369 )
3370 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003371 decoder = enc and codecs.getincrementaldecoder(enc)()
3372 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3373 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003374 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003375 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3376 self.check_newline_decoding_utf8(decoder)
3377
Antoine Pitrou66913e22009-03-06 23:40:56 +00003378 def test_newline_bytes(self):
3379 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3380 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003381 self.assertEqual(dec.newlines, None)
3382 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3383 self.assertEqual(dec.newlines, None)
3384 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3385 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003386 dec = self.IncrementalNewlineDecoder(None, translate=False)
3387 _check(dec)
3388 dec = self.IncrementalNewlineDecoder(None, translate=True)
3389 _check(dec)
3390
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003391class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3392 pass
3393
3394class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3395 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003396
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003397
Guido van Rossum01a27522007-03-07 01:00:12 +00003398# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003399
Guido van Rossum5abbf752007-08-27 17:39:33 +00003400class MiscIOTest(unittest.TestCase):
3401
Barry Warsaw40e82462008-11-20 20:14:50 +00003402 def tearDown(self):
3403 support.unlink(support.TESTFN)
3404
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003405 def test___all__(self):
3406 for name in self.io.__all__:
3407 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003408 self.assertIsNotNone(obj, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003409 if name == "open":
3410 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003411 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003412 self.assertTrue(issubclass(obj, Exception), name)
3413 elif not name.startswith("SEEK_"):
3414 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003415
Barry Warsaw40e82462008-11-20 20:14:50 +00003416 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003417 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003418 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003419 f.close()
3420
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003421 with support.check_warnings(('', DeprecationWarning)):
3422 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003423 self.assertEqual(f.name, support.TESTFN)
3424 self.assertEqual(f.buffer.name, support.TESTFN)
3425 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3426 self.assertEqual(f.mode, "U")
3427 self.assertEqual(f.buffer.mode, "rb")
3428 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003429 f.close()
3430
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003431 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003432 self.assertEqual(f.mode, "w+")
3433 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3434 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003435
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003436 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003437 self.assertEqual(g.mode, "wb")
3438 self.assertEqual(g.raw.mode, "wb")
3439 self.assertEqual(g.name, f.fileno())
3440 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003441 f.close()
3442 g.close()
3443
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003444 def test_io_after_close(self):
3445 for kwargs in [
3446 {"mode": "w"},
3447 {"mode": "wb"},
3448 {"mode": "w", "buffering": 1},
3449 {"mode": "w", "buffering": 2},
3450 {"mode": "wb", "buffering": 0},
3451 {"mode": "r"},
3452 {"mode": "rb"},
3453 {"mode": "r", "buffering": 1},
3454 {"mode": "r", "buffering": 2},
3455 {"mode": "rb", "buffering": 0},
3456 {"mode": "w+"},
3457 {"mode": "w+b"},
3458 {"mode": "w+", "buffering": 1},
3459 {"mode": "w+", "buffering": 2},
3460 {"mode": "w+b", "buffering": 0},
3461 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003462 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003463 f.close()
3464 self.assertRaises(ValueError, f.flush)
3465 self.assertRaises(ValueError, f.fileno)
3466 self.assertRaises(ValueError, f.isatty)
3467 self.assertRaises(ValueError, f.__iter__)
3468 if hasattr(f, "peek"):
3469 self.assertRaises(ValueError, f.peek, 1)
3470 self.assertRaises(ValueError, f.read)
3471 if hasattr(f, "read1"):
3472 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003473 if hasattr(f, "readall"):
3474 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003475 if hasattr(f, "readinto"):
3476 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003477 if hasattr(f, "readinto1"):
3478 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003479 self.assertRaises(ValueError, f.readline)
3480 self.assertRaises(ValueError, f.readlines)
3481 self.assertRaises(ValueError, f.seek, 0)
3482 self.assertRaises(ValueError, f.tell)
3483 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003484 self.assertRaises(ValueError, f.write,
3485 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003486 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003487 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003488
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003489 def test_blockingioerror(self):
3490 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003491 class C(str):
3492 pass
3493 c = C("")
3494 b = self.BlockingIOError(1, c)
3495 c.b = b
3496 b.c = c
3497 wr = weakref.ref(c)
3498 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003499 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003500 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003501
3502 def test_abcs(self):
3503 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003504 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3505 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3506 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3507 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003508
3509 def _check_abc_inheritance(self, abcmodule):
3510 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003511 self.assertIsInstance(f, abcmodule.IOBase)
3512 self.assertIsInstance(f, abcmodule.RawIOBase)
3513 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3514 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003515 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003516 self.assertIsInstance(f, abcmodule.IOBase)
3517 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3518 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3519 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003520 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003521 self.assertIsInstance(f, abcmodule.IOBase)
3522 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3523 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3524 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003525
3526 def test_abc_inheritance(self):
3527 # Test implementations inherit from their respective ABCs
3528 self._check_abc_inheritance(self)
3529
3530 def test_abc_inheritance_official(self):
3531 # Test implementations inherit from the official ABCs of the
3532 # baseline "io" module.
3533 self._check_abc_inheritance(io)
3534
Antoine Pitroue033e062010-10-29 10:38:18 +00003535 def _check_warn_on_dealloc(self, *args, **kwargs):
3536 f = open(*args, **kwargs)
3537 r = repr(f)
3538 with self.assertWarns(ResourceWarning) as cm:
3539 f = None
3540 support.gc_collect()
3541 self.assertIn(r, str(cm.warning.args[0]))
3542
3543 def test_warn_on_dealloc(self):
3544 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3545 self._check_warn_on_dealloc(support.TESTFN, "wb")
3546 self._check_warn_on_dealloc(support.TESTFN, "w")
3547
3548 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3549 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003550 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003551 for fd in fds:
3552 try:
3553 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003554 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003555 if e.errno != errno.EBADF:
3556 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003557 self.addCleanup(cleanup_fds)
3558 r, w = os.pipe()
3559 fds += r, w
3560 self._check_warn_on_dealloc(r, *args, **kwargs)
3561 # When using closefd=False, there's no warning
3562 r, w = os.pipe()
3563 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02003564 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00003565 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00003566
3567 def test_warn_on_dealloc_fd(self):
3568 self._check_warn_on_dealloc_fd("rb", buffering=0)
3569 self._check_warn_on_dealloc_fd("rb")
3570 self._check_warn_on_dealloc_fd("r")
3571
3572
Antoine Pitrou243757e2010-11-05 21:15:39 +00003573 def test_pickling(self):
3574 # Pickling file objects is forbidden
3575 for kwargs in [
3576 {"mode": "w"},
3577 {"mode": "wb"},
3578 {"mode": "wb", "buffering": 0},
3579 {"mode": "r"},
3580 {"mode": "rb"},
3581 {"mode": "rb", "buffering": 0},
3582 {"mode": "w+"},
3583 {"mode": "w+b"},
3584 {"mode": "w+b", "buffering": 0},
3585 ]:
3586 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3587 with self.open(support.TESTFN, **kwargs) as f:
3588 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3589
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003590 def test_nonblock_pipe_write_bigbuf(self):
3591 self._test_nonblock_pipe_write(16*1024)
3592
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003593 def test_nonblock_pipe_write_smallbuf(self):
3594 self._test_nonblock_pipe_write(1024)
3595
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003596 @unittest.skipUnless(hasattr(os, 'set_blocking'),
3597 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003598 def _test_nonblock_pipe_write(self, bufsize):
3599 sent = []
3600 received = []
3601 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003602 os.set_blocking(r, False)
3603 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003604
3605 # To exercise all code paths in the C implementation we need
3606 # to play with buffer sizes. For instance, if we choose a
3607 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3608 # then we will never get a partial write of the buffer.
3609 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3610 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3611
3612 with rf, wf:
3613 for N in 9999, 73, 7574:
3614 try:
3615 i = 0
3616 while True:
3617 msg = bytes([i % 26 + 97]) * N
3618 sent.append(msg)
3619 wf.write(msg)
3620 i += 1
3621
3622 except self.BlockingIOError as e:
3623 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003624 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003625 sent[-1] = sent[-1][:e.characters_written]
3626 received.append(rf.read())
3627 msg = b'BLOCKED'
3628 wf.write(msg)
3629 sent.append(msg)
3630
3631 while True:
3632 try:
3633 wf.flush()
3634 break
3635 except self.BlockingIOError as e:
3636 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003637 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003638 self.assertEqual(e.characters_written, 0)
3639 received.append(rf.read())
3640
3641 received += iter(rf.read, None)
3642
3643 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003644 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003645 self.assertTrue(wf.closed)
3646 self.assertTrue(rf.closed)
3647
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003648 def test_create_fail(self):
3649 # 'x' mode fails if file is existing
3650 with self.open(support.TESTFN, 'w'):
3651 pass
3652 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3653
3654 def test_create_writes(self):
3655 # 'x' mode opens for writing
3656 with self.open(support.TESTFN, 'xb') as f:
3657 f.write(b"spam")
3658 with self.open(support.TESTFN, 'rb') as f:
3659 self.assertEqual(b"spam", f.read())
3660
Christian Heimes7b648752012-09-10 14:48:43 +02003661 def test_open_allargs(self):
3662 # there used to be a buffer overflow in the parser for rawmode
3663 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3664
3665
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003666class CMiscIOTest(MiscIOTest):
3667 io = io
3668
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003669 def test_readinto_buffer_overflow(self):
3670 # Issue #18025
3671 class BadReader(self.io.BufferedIOBase):
3672 def read(self, n=-1):
3673 return b'x' * 10**6
3674 bufio = BadReader()
3675 b = bytearray(2)
3676 self.assertRaises(ValueError, bufio.readinto, b)
3677
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003678 @unittest.skipUnless(threading, 'Threading required for this test.')
3679 def check_daemon_threads_shutdown_deadlock(self, stream_name):
3680 # Issue #23309: deadlocks at shutdown should be avoided when a
3681 # daemon thread and the main thread both write to a file.
3682 code = """if 1:
3683 import sys
3684 import time
3685 import threading
3686
3687 file = sys.{stream_name}
3688
3689 def run():
3690 while True:
3691 file.write('.')
3692 file.flush()
3693
3694 thread = threading.Thread(target=run)
3695 thread.daemon = True
3696 thread.start()
3697
3698 time.sleep(0.5)
3699 file.write('!')
3700 file.flush()
3701 """.format_map(locals())
3702 res, _ = run_python_until_end("-c", code)
3703 err = res.err.decode()
3704 if res.rc != 0:
3705 # Failure: should be a fatal error
3706 self.assertIn("Fatal Python error: could not acquire lock "
3707 "for <_io.BufferedWriter name='<{stream_name}>'> "
3708 "at interpreter shutdown, possibly due to "
3709 "daemon threads".format_map(locals()),
3710 err)
3711 else:
3712 self.assertFalse(err.strip('.!'))
3713
3714 def test_daemon_threads_shutdown_stdout_deadlock(self):
3715 self.check_daemon_threads_shutdown_deadlock('stdout')
3716
3717 def test_daemon_threads_shutdown_stderr_deadlock(self):
3718 self.check_daemon_threads_shutdown_deadlock('stderr')
3719
3720
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003721class PyMiscIOTest(MiscIOTest):
3722 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003723
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003724
3725@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3726class SignalsTest(unittest.TestCase):
3727
3728 def setUp(self):
3729 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3730
3731 def tearDown(self):
3732 signal.signal(signal.SIGALRM, self.oldalrm)
3733
3734 def alarm_interrupt(self, sig, frame):
3735 1/0
3736
3737 @unittest.skipUnless(threading, 'Threading required for this test.')
3738 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3739 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003740 invokes the signal handler, and bubbles up the exception raised
3741 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003742 read_results = []
3743 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003744 if hasattr(signal, 'pthread_sigmask'):
3745 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003746 s = os.read(r, 1)
3747 read_results.append(s)
3748 t = threading.Thread(target=_read)
3749 t.daemon = True
3750 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003751 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01003752 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003753 try:
3754 wio = self.io.open(w, **fdopen_kwargs)
3755 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003756 # Fill the pipe enough that the write will be blocking.
3757 # It will be interrupted by the timer armed above. Since the
3758 # other thread has read one byte, the low-level write will
3759 # return with a successful (partial) result rather than an EINTR.
3760 # The buffered IO layer must check for pending signal
3761 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003762 signal.alarm(1)
3763 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01003764 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02003765 finally:
3766 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003767 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003768 # We got one byte, get another one and check that it isn't a
3769 # repeat of the first one.
3770 read_results.append(os.read(r, 1))
3771 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3772 finally:
3773 os.close(w)
3774 os.close(r)
3775 # This is deliberate. If we didn't close the file descriptor
3776 # before closing wio, wio would try to flush its internal
3777 # buffer, and block again.
3778 try:
3779 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003780 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003781 if e.errno != errno.EBADF:
3782 raise
3783
3784 def test_interrupted_write_unbuffered(self):
3785 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3786
3787 def test_interrupted_write_buffered(self):
3788 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3789
Victor Stinner6ab72862014-09-03 23:32:28 +02003790 # Issue #22331: The test hangs on FreeBSD 7.2
3791 @support.requires_freebsd_version(8)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003792 def test_interrupted_write_text(self):
3793 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3794
Brett Cannon31f59292011-02-21 19:29:56 +00003795 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003796 def check_reentrant_write(self, data, **fdopen_kwargs):
3797 def on_alarm(*args):
3798 # Will be called reentrantly from the same thread
3799 wio.write(data)
3800 1/0
3801 signal.signal(signal.SIGALRM, on_alarm)
3802 r, w = os.pipe()
3803 wio = self.io.open(w, **fdopen_kwargs)
3804 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003805 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003806 # Either the reentrant call to wio.write() fails with RuntimeError,
3807 # or the signal handler raises ZeroDivisionError.
3808 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3809 while 1:
3810 for i in range(100):
3811 wio.write(data)
3812 wio.flush()
3813 # Make sure the buffer doesn't fill up and block further writes
3814 os.read(r, len(data) * 100)
3815 exc = cm.exception
3816 if isinstance(exc, RuntimeError):
3817 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3818 finally:
3819 wio.close()
3820 os.close(r)
3821
3822 def test_reentrant_write_buffered(self):
3823 self.check_reentrant_write(b"xy", mode="wb")
3824
3825 def test_reentrant_write_text(self):
3826 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3827
Antoine Pitrou707ce822011-02-25 21:24:11 +00003828 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3829 """Check that a buffered read, when it gets interrupted (either
3830 returning a partial result or EINTR), properly invokes the signal
3831 handler and retries if the latter returned successfully."""
3832 r, w = os.pipe()
3833 fdopen_kwargs["closefd"] = False
3834 def alarm_handler(sig, frame):
3835 os.write(w, b"bar")
3836 signal.signal(signal.SIGALRM, alarm_handler)
3837 try:
3838 rio = self.io.open(r, **fdopen_kwargs)
3839 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003840 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003841 # Expected behaviour:
3842 # - first raw read() returns partial b"foo"
3843 # - second raw read() returns EINTR
3844 # - third raw read() returns b"bar"
3845 self.assertEqual(decode(rio.read(6)), "foobar")
3846 finally:
3847 rio.close()
3848 os.close(w)
3849 os.close(r)
3850
Antoine Pitrou20db5112011-08-19 20:32:34 +02003851 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003852 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3853 mode="rb")
3854
Antoine Pitrou20db5112011-08-19 20:32:34 +02003855 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003856 self.check_interrupted_read_retry(lambda x: x,
3857 mode="r")
3858
3859 @unittest.skipUnless(threading, 'Threading required for this test.')
3860 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3861 """Check that a buffered write, when it gets interrupted (either
3862 returning a partial result or EINTR), properly invokes the signal
3863 handler and retries if the latter returned successfully."""
3864 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003865
Antoine Pitrou707ce822011-02-25 21:24:11 +00003866 # A quantity that exceeds the buffer size of an anonymous pipe's
3867 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003868 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003869 r, w = os.pipe()
3870 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003871
Antoine Pitrou707ce822011-02-25 21:24:11 +00003872 # We need a separate thread to read from the pipe and allow the
3873 # write() to finish. This thread is started after the SIGALRM is
3874 # received (forcing a first EINTR in write()).
3875 read_results = []
3876 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003877 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00003878 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003879 try:
3880 while not write_finished:
3881 while r in select.select([r], [], [], 1.0)[0]:
3882 s = os.read(r, 1024)
3883 read_results.append(s)
3884 except BaseException as exc:
3885 nonlocal error
3886 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00003887 t = threading.Thread(target=_read)
3888 t.daemon = True
3889 def alarm1(sig, frame):
3890 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003891 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003892 def alarm2(sig, frame):
3893 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003894
3895 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00003896 signal.signal(signal.SIGALRM, alarm1)
3897 try:
3898 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003899 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003900 # Expected behaviour:
3901 # - first raw write() is partial (because of the limited pipe buffer
3902 # and the first alarm)
3903 # - second raw write() returns EINTR (because of the second alarm)
3904 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003905 written = wio.write(large_data)
3906 self.assertEqual(N, written)
3907
Antoine Pitrou707ce822011-02-25 21:24:11 +00003908 wio.flush()
3909 write_finished = True
3910 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003911
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003912 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003913 self.assertEqual(N, sum(len(x) for x in read_results))
3914 finally:
3915 write_finished = True
3916 os.close(w)
3917 os.close(r)
3918 # This is deliberate. If we didn't close the file descriptor
3919 # before closing wio, wio would try to flush its internal
3920 # buffer, and could block (in case of failure).
3921 try:
3922 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003923 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003924 if e.errno != errno.EBADF:
3925 raise
3926
Antoine Pitrou20db5112011-08-19 20:32:34 +02003927 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003928 self.check_interrupted_write_retry(b"x", mode="wb")
3929
Antoine Pitrou20db5112011-08-19 20:32:34 +02003930 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003931 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3932
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003933
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003934class CSignalsTest(SignalsTest):
3935 io = io
3936
3937class PySignalsTest(SignalsTest):
3938 io = pyio
3939
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003940 # Handling reentrancy issues would slow down _pyio even more, so the
3941 # tests are disabled.
3942 test_reentrant_write_buffered = None
3943 test_reentrant_write_text = None
3944
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003945
Ezio Melottidaa42c72013-03-23 16:30:16 +02003946def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07003947 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003948 CBufferedReaderTest, PyBufferedReaderTest,
3949 CBufferedWriterTest, PyBufferedWriterTest,
3950 CBufferedRWPairTest, PyBufferedRWPairTest,
3951 CBufferedRandomTest, PyBufferedRandomTest,
3952 StatefulIncrementalDecoderTest,
3953 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3954 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003955 CMiscIOTest, PyMiscIOTest,
3956 CSignalsTest, PySignalsTest,
3957 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003958
3959 # Put the namespaces of the IO module we are testing and some useful mock
3960 # classes in the __dict__ of each test.
3961 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003962 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003963 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3964 c_io_ns = {name : getattr(io, name) for name in all_members}
3965 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3966 globs = globals()
3967 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3968 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3969 # Avoid turning open into a bound method.
3970 py_io_ns["open"] = pyio.OpenWrapper
3971 for test in tests:
3972 if test.__name__.startswith("C"):
3973 for name, obj in c_io_ns.items():
3974 setattr(test, name, obj)
3975 elif test.__name__.startswith("Py"):
3976 for name, obj in py_io_ns.items():
3977 setattr(test, name, obj)
3978
Ezio Melottidaa42c72013-03-23 16:30:16 +02003979 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3980 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003981
3982if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003983 unittest.main()