blob: 6bb4127b0959be665bbcb5b3ffa1a16463dd957c [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020031import threading
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000032import time
Guido van Rossum28524c72007-02-27 05:47:44 +000033import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000034import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020035import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020036from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020037from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000038from test import support
Berker Peksagce643912015-05-06 06:33:17 +030039from test.support.script_helper import assert_python_ok, run_python_until_end
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000040
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000041import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000042import io # C implementation of io
43import _pyio as pyio # Python implementation of io
Guido van Rossuma9e20242007-03-08 00:43:48 +000044
Martin Panter6bb91f32016-05-28 00:41:57 +000045try:
46 import ctypes
47except ImportError:
48 def byteslike(*pos, **kw):
49 return array.array("b", bytes(*pos, **kw))
50else:
51 def byteslike(*pos, **kw):
52 """Create a bytes-like object having no string or sequence methods"""
53 data = bytes(*pos, **kw)
54 obj = EmptyStruct()
55 ctypes.resize(obj, len(data))
56 memoryview(obj).cast("B")[:] = data
57 return obj
58 class EmptyStruct(ctypes.Structure):
59 pass
60
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000061def _default_chunk_size():
62 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000063 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064 return f._CHUNK_SIZE
65
66
Antoine Pitrou328ec742010-09-14 18:37:24 +000067class MockRawIOWithoutRead:
68 """A RawIO implementation without read(), so as to exercise the default
69 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000070
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000071 def __init__(self, read_stack=()):
72 self._read_stack = list(read_stack)
73 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000074 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000075 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000076
Guido van Rossum01a27522007-03-07 01:00:12 +000077 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000078 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000079 return len(b)
80
81 def writable(self):
82 return True
83
Guido van Rossum68bbcd22007-02-27 17:19:33 +000084 def fileno(self):
85 return 42
86
87 def readable(self):
88 return True
89
Guido van Rossum01a27522007-03-07 01:00:12 +000090 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000091 return True
92
Guido van Rossum01a27522007-03-07 01:00:12 +000093 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000094 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000095
96 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0 # same comment as above
98
99 def readinto(self, buf):
100 self._reads += 1
101 max_len = len(buf)
102 try:
103 data = self._read_stack[0]
104 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000105 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000106 return 0
107 if data is None:
108 del self._read_stack[0]
109 return None
110 n = len(data)
111 if len(data) <= max_len:
112 del self._read_stack[0]
113 buf[:n] = data
114 return n
115 else:
116 buf[:] = data[:max_len]
117 self._read_stack[0] = data[max_len:]
118 return max_len
119
120 def truncate(self, pos=None):
121 return pos
122
Antoine Pitrou328ec742010-09-14 18:37:24 +0000123class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
124 pass
125
126class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
127 pass
128
129
130class MockRawIO(MockRawIOWithoutRead):
131
132 def read(self, n=None):
133 self._reads += 1
134 try:
135 return self._read_stack.pop(0)
136 except:
137 self._extraneous_reads += 1
138 return b""
139
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000140class CMockRawIO(MockRawIO, io.RawIOBase):
141 pass
142
143class PyMockRawIO(MockRawIO, pyio.RawIOBase):
144 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000145
Guido van Rossuma9e20242007-03-08 00:43:48 +0000146
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000147class MisbehavedRawIO(MockRawIO):
148 def write(self, b):
149 return super().write(b) * 2
150
151 def read(self, n=None):
152 return super().read(n) * 2
153
154 def seek(self, pos, whence):
155 return -123
156
157 def tell(self):
158 return -456
159
160 def readinto(self, buf):
161 super().readinto(buf)
162 return len(buf) * 5
163
164class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
165 pass
166
167class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
168 pass
169
170
benfogle9703f092017-11-10 16:03:40 -0500171class SlowFlushRawIO(MockRawIO):
172 def __init__(self):
173 super().__init__()
174 self.in_flush = threading.Event()
175
176 def flush(self):
177 self.in_flush.set()
178 time.sleep(0.25)
179
180class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
181 pass
182
183class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
184 pass
185
186
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000187class CloseFailureIO(MockRawIO):
188 closed = 0
189
190 def close(self):
191 if not self.closed:
192 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200193 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000194
195class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
196 pass
197
198class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
199 pass
200
201
202class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000203
204 def __init__(self, data):
205 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000206 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000207
208 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000209 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000210 self.read_history.append(None if res is None else len(res))
211 return res
212
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000213 def readinto(self, b):
214 res = super().readinto(b)
215 self.read_history.append(res)
216 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000217
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000218class CMockFileIO(MockFileIO, io.BytesIO):
219 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000220
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000221class PyMockFileIO(MockFileIO, pyio.BytesIO):
222 pass
223
224
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000225class MockUnseekableIO:
226 def seekable(self):
227 return False
228
229 def seek(self, *args):
230 raise self.UnsupportedOperation("not seekable")
231
232 def tell(self, *args):
233 raise self.UnsupportedOperation("not seekable")
234
Martin Panter754aab22016-03-31 07:21:56 +0000235 def truncate(self, *args):
236 raise self.UnsupportedOperation("not seekable")
237
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000238class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
239 UnsupportedOperation = io.UnsupportedOperation
240
241class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
242 UnsupportedOperation = pyio.UnsupportedOperation
243
244
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000245class MockNonBlockWriterIO:
246
247 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000248 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000249 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000250
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000251 def pop_written(self):
252 s = b"".join(self._write_stack)
253 self._write_stack[:] = []
254 return s
255
256 def block_on(self, char):
257 """Block when a given char is encountered."""
258 self._blocker_char = char
259
260 def readable(self):
261 return True
262
263 def seekable(self):
264 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000265
Guido van Rossum01a27522007-03-07 01:00:12 +0000266 def writable(self):
267 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000268
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000269 def write(self, b):
270 b = bytes(b)
271 n = -1
272 if self._blocker_char:
273 try:
274 n = b.index(self._blocker_char)
275 except ValueError:
276 pass
277 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100278 if n > 0:
279 # write data up to the first blocker
280 self._write_stack.append(b[:n])
281 return n
282 else:
283 # cancel blocker and indicate would block
284 self._blocker_char = None
285 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000286 self._write_stack.append(b)
287 return len(b)
288
289class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
290 BlockingIOError = io.BlockingIOError
291
292class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
293 BlockingIOError = pyio.BlockingIOError
294
Guido van Rossuma9e20242007-03-08 00:43:48 +0000295
Guido van Rossum28524c72007-02-27 05:47:44 +0000296class IOTest(unittest.TestCase):
297
Neal Norwitze7789b12008-03-24 06:18:09 +0000298 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000299 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000300
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000301 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000302 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000303
Guido van Rossum28524c72007-02-27 05:47:44 +0000304 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000305 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000306 f.truncate(0)
307 self.assertEqual(f.tell(), 5)
308 f.seek(0)
309
310 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000311 self.assertEqual(f.seek(0), 0)
312 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000313 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000314 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000315 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000316 buffer = bytearray(b" world\n\n\n")
317 self.assertEqual(f.write(buffer), 9)
318 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000319 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000320 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000321 self.assertEqual(f.seek(-1, 2), 13)
322 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000323
Guido van Rossum87429772007-04-10 21:06:59 +0000324 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000325 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000326 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000327
Guido van Rossum9b76da62007-04-11 01:09:03 +0000328 def read_ops(self, f, buffered=False):
329 data = f.read(5)
330 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000331 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000332 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000333 self.assertEqual(bytes(data), b" worl")
334 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000335 self.assertEqual(f.readinto(data), 2)
336 self.assertEqual(len(data), 5)
337 self.assertEqual(data[:2], b"d\n")
338 self.assertEqual(f.seek(0), 0)
339 self.assertEqual(f.read(20), b"hello world\n")
340 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000341 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000342 self.assertEqual(f.seek(-6, 2), 6)
343 self.assertEqual(f.read(5), b"world")
344 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000345 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000346 self.assertEqual(f.seek(-6, 1), 5)
347 self.assertEqual(f.read(5), b" worl")
348 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000349 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000350 if buffered:
351 f.seek(0)
352 self.assertEqual(f.read(), b"hello world\n")
353 f.seek(6)
354 self.assertEqual(f.read(), b"world\n")
355 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000356 f.seek(0)
357 data = byteslike(5)
358 self.assertEqual(f.readinto1(data), 5)
359 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000360
Guido van Rossum34d69e52007-04-10 20:08:41 +0000361 LARGE = 2**31
362
Guido van Rossum53807da2007-04-10 19:01:47 +0000363 def large_file_ops(self, f):
364 assert f.readable()
365 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100366 try:
367 self.assertEqual(f.seek(self.LARGE), self.LARGE)
368 except (OverflowError, ValueError):
369 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000370 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000371 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000372 self.assertEqual(f.tell(), self.LARGE + 3)
373 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000374 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000375 self.assertEqual(f.tell(), self.LARGE + 2)
376 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000377 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000378 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000379 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
380 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000381 self.assertEqual(f.read(2), b"x")
382
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000383 def test_invalid_operations(self):
384 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000385 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000386 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000387 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000388 self.assertRaises(exc, fp.read)
389 self.assertRaises(exc, fp.readline)
390 with self.open(support.TESTFN, "wb", buffering=0) as fp:
391 self.assertRaises(exc, fp.read)
392 self.assertRaises(exc, fp.readline)
393 with self.open(support.TESTFN, "rb", buffering=0) as fp:
394 self.assertRaises(exc, fp.write, b"blah")
395 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000396 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000397 self.assertRaises(exc, fp.write, b"blah")
398 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000399 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000400 self.assertRaises(exc, fp.write, "blah")
401 self.assertRaises(exc, fp.writelines, ["blah\n"])
402 # Non-zero seeking from current or end pos
403 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
404 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000405
Martin Panter754aab22016-03-31 07:21:56 +0000406 def test_optional_abilities(self):
407 # Test for OSError when optional APIs are not supported
408 # The purpose of this test is to try fileno(), reading, writing and
409 # seeking operations with various objects that indicate they do not
410 # support these operations.
411
412 def pipe_reader():
413 [r, w] = os.pipe()
414 os.close(w) # So that read() is harmless
415 return self.FileIO(r, "r")
416
417 def pipe_writer():
418 [r, w] = os.pipe()
419 self.addCleanup(os.close, r)
420 # Guarantee that we can write into the pipe without blocking
421 thread = threading.Thread(target=os.read, args=(r, 100))
422 thread.start()
423 self.addCleanup(thread.join)
424 return self.FileIO(w, "w")
425
426 def buffered_reader():
427 return self.BufferedReader(self.MockUnseekableIO())
428
429 def buffered_writer():
430 return self.BufferedWriter(self.MockUnseekableIO())
431
432 def buffered_random():
433 return self.BufferedRandom(self.BytesIO())
434
435 def buffered_rw_pair():
436 return self.BufferedRWPair(self.MockUnseekableIO(),
437 self.MockUnseekableIO())
438
439 def text_reader():
440 class UnseekableReader(self.MockUnseekableIO):
441 writable = self.BufferedIOBase.writable
442 write = self.BufferedIOBase.write
443 return self.TextIOWrapper(UnseekableReader(), "ascii")
444
445 def text_writer():
446 class UnseekableWriter(self.MockUnseekableIO):
447 readable = self.BufferedIOBase.readable
448 read = self.BufferedIOBase.read
449 return self.TextIOWrapper(UnseekableWriter(), "ascii")
450
451 tests = (
452 (pipe_reader, "fr"), (pipe_writer, "fw"),
453 (buffered_reader, "r"), (buffered_writer, "w"),
454 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
455 (text_reader, "r"), (text_writer, "w"),
456 (self.BytesIO, "rws"), (self.StringIO, "rws"),
457 )
458 for [test, abilities] in tests:
Martin Panter754aab22016-03-31 07:21:56 +0000459 with self.subTest(test), test() as obj:
460 readable = "r" in abilities
461 self.assertEqual(obj.readable(), readable)
462 writable = "w" in abilities
463 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000464
465 if isinstance(obj, self.TextIOBase):
466 data = "3"
467 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
468 data = b"3"
469 else:
470 self.fail("Unknown base class")
471
472 if "f" in abilities:
473 obj.fileno()
474 else:
475 self.assertRaises(OSError, obj.fileno)
476
477 if readable:
478 obj.read(1)
479 obj.read()
480 else:
481 self.assertRaises(OSError, obj.read, 1)
482 self.assertRaises(OSError, obj.read)
483
484 if writable:
485 obj.write(data)
486 else:
487 self.assertRaises(OSError, obj.write, data)
488
Martin Panter3ee147f2016-03-31 21:05:31 +0000489 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000490 pipe_reader, pipe_writer):
491 # Pipes seem to appear as seekable on Windows
492 continue
493 seekable = "s" in abilities
494 self.assertEqual(obj.seekable(), seekable)
495
Martin Panter754aab22016-03-31 07:21:56 +0000496 if seekable:
497 obj.tell()
498 obj.seek(0)
499 else:
500 self.assertRaises(OSError, obj.tell)
501 self.assertRaises(OSError, obj.seek, 0)
502
503 if writable and seekable:
504 obj.truncate()
505 obj.truncate(0)
506 else:
507 self.assertRaises(OSError, obj.truncate)
508 self.assertRaises(OSError, obj.truncate, 0)
509
Antoine Pitrou13348842012-01-29 18:36:34 +0100510 def test_open_handles_NUL_chars(self):
511 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300512 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100513
514 bytes_fn = bytes(fn_with_NUL, 'ascii')
515 with warnings.catch_warnings():
516 warnings.simplefilter("ignore", DeprecationWarning)
517 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100518
Guido van Rossum28524c72007-02-27 05:47:44 +0000519 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000520 with self.open(support.TESTFN, "wb", buffering=0) as f:
521 self.assertEqual(f.readable(), False)
522 self.assertEqual(f.writable(), True)
523 self.assertEqual(f.seekable(), True)
524 self.write_ops(f)
525 with self.open(support.TESTFN, "rb", buffering=0) as f:
526 self.assertEqual(f.readable(), True)
527 self.assertEqual(f.writable(), False)
528 self.assertEqual(f.seekable(), True)
529 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000530
Guido van Rossum87429772007-04-10 21:06:59 +0000531 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000532 with self.open(support.TESTFN, "wb") as f:
533 self.assertEqual(f.readable(), False)
534 self.assertEqual(f.writable(), True)
535 self.assertEqual(f.seekable(), True)
536 self.write_ops(f)
537 with self.open(support.TESTFN, "rb") as f:
538 self.assertEqual(f.readable(), True)
539 self.assertEqual(f.writable(), False)
540 self.assertEqual(f.seekable(), True)
541 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000542
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000543 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000544 with self.open(support.TESTFN, "wb") as f:
545 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
546 with self.open(support.TESTFN, "rb") as f:
547 self.assertEqual(f.readline(), b"abc\n")
548 self.assertEqual(f.readline(10), b"def\n")
549 self.assertEqual(f.readline(2), b"xy")
550 self.assertEqual(f.readline(4), b"zzy\n")
551 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000552 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000553 self.assertRaises(TypeError, f.readline, 5.3)
554 with self.open(support.TESTFN, "r") as f:
555 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000556
Serhiy Storchakabf623ae2017-04-19 20:03:52 +0300557 def test_readline_nonsizeable(self):
558 # Issue #30061
559 # Crash when readline() returns an object without __len__
560 class R(self.IOBase):
561 def readline(self):
562 return None
563 self.assertRaises((TypeError, StopIteration), next, R())
564
565 def test_next_nonsizeable(self):
566 # Issue #30061
567 # Crash when __next__() returns an object without __len__
568 class R(self.IOBase):
569 def __next__(self):
570 return None
571 self.assertRaises(TypeError, R().readlines, 1)
572
Guido van Rossum28524c72007-02-27 05:47:44 +0000573 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000574 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000575 self.write_ops(f)
576 data = f.getvalue()
577 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000578 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000579 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000580
Guido van Rossum53807da2007-04-10 19:01:47 +0000581 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000582 # On Windows and Mac OSX this test comsumes large resources; It takes
Victor Stinner8c663fd2017-11-08 14:44:44 -0800583 # a long time to build the >2 GiB file and takes >2 GiB of disk space
Guido van Rossum34d69e52007-04-10 20:08:41 +0000584 # therefore the resource must be enabled to run this test.
585 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600586 support.requires(
587 'largefile',
588 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000589 with self.open(support.TESTFN, "w+b", 0) as f:
590 self.large_file_ops(f)
591 with self.open(support.TESTFN, "w+b") as f:
592 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000593
594 def test_with_open(self):
595 for bufsize in (0, 1, 100):
596 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000597 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000598 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000599 self.assertEqual(f.closed, True)
600 f = None
601 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000602 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000603 1/0
604 except ZeroDivisionError:
605 self.assertEqual(f.closed, True)
606 else:
607 self.fail("1/0 didn't raise an exception")
608
Antoine Pitrou08838b62009-01-21 00:55:13 +0000609 # issue 5008
610 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000611 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000612 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000613 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000614 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000615 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000616 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000617 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300618 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000619
Guido van Rossum87429772007-04-10 21:06:59 +0000620 def test_destructor(self):
621 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000622 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000623 def __del__(self):
624 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000625 try:
626 f = super().__del__
627 except AttributeError:
628 pass
629 else:
630 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000631 def close(self):
632 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000633 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000634 def flush(self):
635 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000636 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000637 with support.check_warnings(('', ResourceWarning)):
638 f = MyFileIO(support.TESTFN, "wb")
639 f.write(b"xxx")
640 del f
641 support.gc_collect()
642 self.assertEqual(record, [1, 2, 3])
643 with self.open(support.TESTFN, "rb") as f:
644 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000645
646 def _check_base_destructor(self, base):
647 record = []
648 class MyIO(base):
649 def __init__(self):
650 # This exercises the availability of attributes on object
651 # destruction.
652 # (in the C version, close() is called by the tp_dealloc
653 # function, not by __del__)
654 self.on_del = 1
655 self.on_close = 2
656 self.on_flush = 3
657 def __del__(self):
658 record.append(self.on_del)
659 try:
660 f = super().__del__
661 except AttributeError:
662 pass
663 else:
664 f()
665 def close(self):
666 record.append(self.on_close)
667 super().close()
668 def flush(self):
669 record.append(self.on_flush)
670 super().flush()
671 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000672 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000673 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000674 self.assertEqual(record, [1, 2, 3])
675
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000676 def test_IOBase_destructor(self):
677 self._check_base_destructor(self.IOBase)
678
679 def test_RawIOBase_destructor(self):
680 self._check_base_destructor(self.RawIOBase)
681
682 def test_BufferedIOBase_destructor(self):
683 self._check_base_destructor(self.BufferedIOBase)
684
685 def test_TextIOBase_destructor(self):
686 self._check_base_destructor(self.TextIOBase)
687
Guido van Rossum87429772007-04-10 21:06:59 +0000688 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000689 with self.open(support.TESTFN, "wb") as f:
690 f.write(b"xxx")
691 with self.open(support.TESTFN, "rb") as f:
692 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000693
Guido van Rossumd4103952007-04-12 05:44:49 +0000694 def test_array_writes(self):
695 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000696 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000697 def check(f):
698 with f:
699 self.assertEqual(f.write(a), n)
700 f.writelines((a,))
701 check(self.BytesIO())
702 check(self.FileIO(support.TESTFN, "w"))
703 check(self.BufferedWriter(self.MockRawIO()))
704 check(self.BufferedRandom(self.MockRawIO()))
705 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000706
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000707 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000708 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000709 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000710
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000711 def test_read_closed(self):
712 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000713 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000714 with self.open(support.TESTFN, "r") as f:
715 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000716 self.assertEqual(file.read(), "egg\n")
717 file.seek(0)
718 file.close()
719 self.assertRaises(ValueError, file.read)
720
721 def test_no_closefd_with_filename(self):
722 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000723 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000724
725 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000726 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000727 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000728 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000729 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000730 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000731 self.assertEqual(file.buffer.raw.closefd, False)
732
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000733 def test_garbage_collection(self):
734 # FileIO objects are collected, and collecting them flushes
735 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000736 with support.check_warnings(('', ResourceWarning)):
737 f = self.FileIO(support.TESTFN, "wb")
738 f.write(b"abcxxx")
739 f.f = f
740 wr = weakref.ref(f)
741 del f
742 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300743 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000744 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000745 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000746
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000747 def test_unbounded_file(self):
748 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
749 zero = "/dev/zero"
750 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000751 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000752 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000753 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000754 if support.real_max_memuse < support._2G:
Victor Stinner8c663fd2017-11-08 14:44:44 -0800755 self.skipTest("test requires at least 2 GiB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000756 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000757 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000758 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000759 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000760 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000761 self.assertRaises(OverflowError, f.read)
762
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200763 def check_flush_error_on_close(self, *args, **kwargs):
764 # Test that the file is closed despite failed flush
765 # and that flush() is called before file closed.
766 f = self.open(*args, **kwargs)
767 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000768 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200769 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200770 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000771 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200772 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600773 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200774 self.assertTrue(closed) # flush() called
775 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200776 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200777
778 def test_flush_error_on_close(self):
779 # raw file
780 # Issue #5700: io.FileIO calls flush() after file closed
781 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
782 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
783 self.check_flush_error_on_close(fd, 'wb', buffering=0)
784 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
785 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
786 os.close(fd)
787 # buffered io
788 self.check_flush_error_on_close(support.TESTFN, 'wb')
789 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
790 self.check_flush_error_on_close(fd, 'wb')
791 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
792 self.check_flush_error_on_close(fd, 'wb', closefd=False)
793 os.close(fd)
794 # text io
795 self.check_flush_error_on_close(support.TESTFN, 'w')
796 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
797 self.check_flush_error_on_close(fd, 'w')
798 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
799 self.check_flush_error_on_close(fd, 'w', closefd=False)
800 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000801
802 def test_multi_close(self):
803 f = self.open(support.TESTFN, "wb", buffering=0)
804 f.close()
805 f.close()
806 f.close()
807 self.assertRaises(ValueError, f.flush)
808
Antoine Pitrou328ec742010-09-14 18:37:24 +0000809 def test_RawIOBase_read(self):
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530810 # Exercise the default limited RawIOBase.read(n) implementation (which
811 # calls readinto() internally).
Antoine Pitrou328ec742010-09-14 18:37:24 +0000812 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
813 self.assertEqual(rawio.read(2), b"ab")
814 self.assertEqual(rawio.read(2), b"c")
815 self.assertEqual(rawio.read(2), b"d")
816 self.assertEqual(rawio.read(2), None)
817 self.assertEqual(rawio.read(2), b"ef")
818 self.assertEqual(rawio.read(2), b"g")
819 self.assertEqual(rawio.read(2), None)
820 self.assertEqual(rawio.read(2), b"")
821
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400822 def test_types_have_dict(self):
823 test = (
824 self.IOBase(),
825 self.RawIOBase(),
826 self.TextIOBase(),
827 self.StringIO(),
828 self.BytesIO()
829 )
830 for obj in test:
831 self.assertTrue(hasattr(obj, "__dict__"))
832
Ross Lagerwall59142db2011-10-31 20:34:46 +0200833 def test_opener(self):
834 with self.open(support.TESTFN, "w") as f:
835 f.write("egg\n")
836 fd = os.open(support.TESTFN, os.O_RDONLY)
837 def opener(path, flags):
838 return fd
839 with self.open("non-existent", "r", opener=opener) as f:
840 self.assertEqual(f.read(), "egg\n")
841
Barry Warsaw480e2852016-06-08 17:47:26 -0400842 def test_bad_opener_negative_1(self):
843 # Issue #27066.
844 def badopener(fname, flags):
845 return -1
846 with self.assertRaises(ValueError) as cm:
847 open('non-existent', 'r', opener=badopener)
848 self.assertEqual(str(cm.exception), 'opener returned -1')
849
850 def test_bad_opener_other_negative(self):
851 # Issue #27066.
852 def badopener(fname, flags):
853 return -2
854 with self.assertRaises(ValueError) as cm:
855 open('non-existent', 'r', opener=badopener)
856 self.assertEqual(str(cm.exception), 'opener returned -2')
857
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200858 def test_fileio_closefd(self):
859 # Issue #4841
860 with self.open(__file__, 'rb') as f1, \
861 self.open(__file__, 'rb') as f2:
862 fileio = self.FileIO(f1.fileno(), closefd=False)
863 # .__init__() must not close f1
864 fileio.__init__(f2.fileno(), closefd=False)
865 f1.readline()
866 # .close() must not close f2
867 fileio.close()
868 f2.readline()
869
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300870 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200871 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300872 with self.assertRaises(ValueError):
873 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300874
875 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200876 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300877 with self.assertRaises(ValueError):
878 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300879
Martin Panter6bb91f32016-05-28 00:41:57 +0000880 def test_buffered_readinto_mixin(self):
881 # Test the implementation provided by BufferedIOBase
882 class Stream(self.BufferedIOBase):
883 def read(self, size):
884 return b"12345"
885 read1 = read
886 stream = Stream()
887 for method in ("readinto", "readinto1"):
888 with self.subTest(method):
889 buffer = byteslike(5)
890 self.assertEqual(getattr(stream, method)(buffer), 5)
891 self.assertEqual(bytes(buffer), b"12345")
892
Ethan Furmand62548a2016-06-04 14:38:43 -0700893 def test_fspath_support(self):
894 class PathLike:
895 def __init__(self, path):
896 self.path = path
897
898 def __fspath__(self):
899 return self.path
900
901 def check_path_succeeds(path):
902 with self.open(path, "w") as f:
903 f.write("egg\n")
904
905 with self.open(path, "r") as f:
906 self.assertEqual(f.read(), "egg\n")
907
908 check_path_succeeds(PathLike(support.TESTFN))
909 check_path_succeeds(PathLike(support.TESTFN.encode('utf-8')))
910
911 bad_path = PathLike(TypeError)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700912 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700913 self.open(bad_path, 'w')
914
915 # ensure that refcounting is correct with some error conditions
916 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
917 self.open(PathLike(support.TESTFN), 'rwxa')
918
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530919 def test_RawIOBase_readall(self):
920 # Exercise the default unlimited RawIOBase.read() and readall()
921 # implementations.
922 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
923 self.assertEqual(rawio.read(), b"abcdefg")
924 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
925 self.assertEqual(rawio.readall(), b"abcdefg")
926
927 def test_BufferedIOBase_readinto(self):
928 # Exercise the default BufferedIOBase.readinto() and readinto1()
929 # implementations (which call read() or read1() internally).
930 class Reader(self.BufferedIOBase):
931 def __init__(self, avail):
932 self.avail = avail
933 def read(self, size):
934 result = self.avail[:size]
935 self.avail = self.avail[size:]
936 return result
937 def read1(self, size):
938 """Returns no more than 5 bytes at once"""
939 return self.read(min(size, 5))
940 tests = (
941 # (test method, total data available, read buffer size, expected
942 # read size)
943 ("readinto", 10, 5, 5),
944 ("readinto", 10, 6, 6), # More than read1() can return
945 ("readinto", 5, 6, 5), # Buffer larger than total available
946 ("readinto", 6, 7, 6),
947 ("readinto", 10, 0, 0), # Empty buffer
948 ("readinto1", 10, 5, 5), # Result limited to single read1() call
949 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return
950 ("readinto1", 5, 6, 5), # Buffer larger than total available
951 ("readinto1", 6, 7, 5),
952 ("readinto1", 10, 0, 0), # Empty buffer
953 )
954 UNUSED_BYTE = 0x81
955 for test in tests:
956 with self.subTest(test):
957 method, avail, request, result = test
958 reader = Reader(bytes(range(avail)))
959 buffer = bytearray((UNUSED_BYTE,) * request)
960 method = getattr(reader, method)
961 self.assertEqual(method(buffer), result)
962 self.assertEqual(len(buffer), request)
963 self.assertSequenceEqual(buffer[:result], range(result))
964 unused = (UNUSED_BYTE,) * (request - result)
965 self.assertSequenceEqual(buffer[result:], unused)
966 self.assertEqual(len(reader.avail), avail - result)
967
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200968
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000969class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200970
971 def test_IOBase_finalize(self):
972 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
973 # class which inherits IOBase and an object of this class are caught
974 # in a reference cycle and close() is already in the method cache.
975 class MyIO(self.IOBase):
976 def close(self):
977 pass
978
979 # create an instance to populate the method cache
980 MyIO()
981 obj = MyIO()
982 obj.obj = obj
983 wr = weakref.ref(obj)
984 del MyIO
985 del obj
986 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300987 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000988
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000989class PyIOTest(IOTest):
990 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000991
Guido van Rossuma9e20242007-03-08 00:43:48 +0000992
Gregory P. Smith1bef9072015-04-14 13:24:34 -0700993@support.cpython_only
994class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -0700995
Gregory P. Smith054b0652015-04-14 12:58:05 -0700996 def test_RawIOBase_io_in_pyio_match(self):
997 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +0200998 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
999 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -07001000 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1001
1002 def test_RawIOBase_pyio_in_io_match(self):
1003 """Test that c RawIOBase class has all pyio RawIOBase methods"""
1004 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1005 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1006
1007
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001008class CommonBufferedTests:
1009 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1010
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001011 def test_detach(self):
1012 raw = self.MockRawIO()
1013 buf = self.tp(raw)
1014 self.assertIs(buf.detach(), raw)
1015 self.assertRaises(ValueError, buf.detach)
1016
Benjamin Peterson10e76b62014-12-21 20:51:50 -06001017 repr(buf) # Should still work
1018
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001019 def test_fileno(self):
1020 rawio = self.MockRawIO()
1021 bufio = self.tp(rawio)
1022
Ezio Melottib3aedd42010-11-20 19:04:17 +00001023 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001024
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001025 def test_invalid_args(self):
1026 rawio = self.MockRawIO()
1027 bufio = self.tp(rawio)
1028 # Invalid whence
1029 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +02001030 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001031
1032 def test_override_destructor(self):
1033 tp = self.tp
1034 record = []
1035 class MyBufferedIO(tp):
1036 def __del__(self):
1037 record.append(1)
1038 try:
1039 f = super().__del__
1040 except AttributeError:
1041 pass
1042 else:
1043 f()
1044 def close(self):
1045 record.append(2)
1046 super().close()
1047 def flush(self):
1048 record.append(3)
1049 super().flush()
1050 rawio = self.MockRawIO()
1051 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001052 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001053 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +00001054 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001055
1056 def test_context_manager(self):
1057 # Test usability as a context manager
1058 rawio = self.MockRawIO()
1059 bufio = self.tp(rawio)
1060 def _with():
1061 with bufio:
1062 pass
1063 _with()
1064 # bufio should now be closed, and using it a second time should raise
1065 # a ValueError.
1066 self.assertRaises(ValueError, _with)
1067
1068 def test_error_through_destructor(self):
1069 # Test that the exception state is not modified by a destructor,
1070 # even if close() fails.
1071 rawio = self.CloseFailureIO()
1072 def f():
1073 self.tp(rawio).xyzzy
1074 with support.captured_output("stderr") as s:
1075 self.assertRaises(AttributeError, f)
1076 s = s.getvalue().strip()
1077 if s:
1078 # The destructor *may* have printed an unraisable error, check it
1079 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001080 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001081 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +00001082
Antoine Pitrou716c4442009-05-23 19:04:03 +00001083 def test_repr(self):
1084 raw = self.MockRawIO()
1085 b = self.tp(raw)
Serhiy Storchaka521e5862014-07-22 15:00:37 +03001086 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001087 self.assertEqual(repr(b), "<%s>" % clsname)
1088 raw.name = "dummy"
1089 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
1090 raw.name = b"dummy"
1091 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
1092
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02001093 def test_recursive_repr(self):
1094 # Issue #25455
1095 raw = self.MockRawIO()
1096 b = self.tp(raw)
1097 with support.swap_attr(raw, 'name', b):
1098 try:
1099 repr(b) # Should not crash
1100 except RuntimeError:
1101 pass
1102
Antoine Pitrou6be88762010-05-03 16:48:20 +00001103 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001104 # Test that buffered file is closed despite failed flush
1105 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001106 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001107 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001108 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001109 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001110 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001111 raw.flush = bad_flush
1112 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001113 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001114 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001115 self.assertTrue(raw.closed)
1116 self.assertTrue(closed) # flush() called
1117 self.assertFalse(closed[0]) # flush() called before file closed
1118 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001119 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001120
1121 def test_close_error_on_close(self):
1122 raw = self.MockRawIO()
1123 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001124 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001125 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001126 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001127 raw.close = bad_close
1128 b = self.tp(raw)
1129 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001130 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001131 b.close()
1132 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001133 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001134 self.assertEqual(err.exception.__context__.args, ('flush',))
1135 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001136
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001137 def test_nonnormalized_close_error_on_close(self):
1138 # Issue #21677
1139 raw = self.MockRawIO()
1140 def bad_flush():
1141 raise non_existing_flush
1142 def bad_close():
1143 raise non_existing_close
1144 raw.close = bad_close
1145 b = self.tp(raw)
1146 b.flush = bad_flush
1147 with self.assertRaises(NameError) as err: # exception not swallowed
1148 b.close()
1149 self.assertIn('non_existing_close', str(err.exception))
1150 self.assertIsInstance(err.exception.__context__, NameError)
1151 self.assertIn('non_existing_flush', str(err.exception.__context__))
1152 self.assertFalse(b.closed)
1153
Antoine Pitrou6be88762010-05-03 16:48:20 +00001154 def test_multi_close(self):
1155 raw = self.MockRawIO()
1156 b = self.tp(raw)
1157 b.close()
1158 b.close()
1159 b.close()
1160 self.assertRaises(ValueError, b.flush)
1161
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001162 def test_unseekable(self):
1163 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1164 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1165 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1166
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001167 def test_readonly_attributes(self):
1168 raw = self.MockRawIO()
1169 buf = self.tp(raw)
1170 x = self.MockRawIO()
1171 with self.assertRaises(AttributeError):
1172 buf.raw = x
1173
Guido van Rossum78892e42007-04-06 17:31:18 +00001174
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001175class SizeofTest:
1176
1177 @support.cpython_only
1178 def test_sizeof(self):
1179 bufsize1 = 4096
1180 bufsize2 = 8192
1181 rawio = self.MockRawIO()
1182 bufio = self.tp(rawio, buffer_size=bufsize1)
1183 size = sys.getsizeof(bufio) - bufsize1
1184 rawio = self.MockRawIO()
1185 bufio = self.tp(rawio, buffer_size=bufsize2)
1186 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1187
Jesus Ceadc469452012-10-04 12:37:56 +02001188 @support.cpython_only
1189 def test_buffer_freeing(self) :
1190 bufsize = 4096
1191 rawio = self.MockRawIO()
1192 bufio = self.tp(rawio, buffer_size=bufsize)
1193 size = sys.getsizeof(bufio) - bufsize
1194 bufio.close()
1195 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001196
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001197class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1198 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001200 def test_constructor(self):
1201 rawio = self.MockRawIO([b"abc"])
1202 bufio = self.tp(rawio)
1203 bufio.__init__(rawio)
1204 bufio.__init__(rawio, buffer_size=1024)
1205 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001206 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001207 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1208 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1209 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1210 rawio = self.MockRawIO([b"abc"])
1211 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001212 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001213
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001214 def test_uninitialized(self):
1215 bufio = self.tp.__new__(self.tp)
1216 del bufio
1217 bufio = self.tp.__new__(self.tp)
1218 self.assertRaisesRegex((ValueError, AttributeError),
1219 'uninitialized|has no attribute',
1220 bufio.read, 0)
1221 bufio.__init__(self.MockRawIO())
1222 self.assertEqual(bufio.read(0), b'')
1223
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001224 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001225 for arg in (None, 7):
1226 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1227 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001228 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001229 # Invalid args
1230 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001231
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001232 def test_read1(self):
1233 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1234 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001235 self.assertEqual(b"a", bufio.read(1))
1236 self.assertEqual(b"b", bufio.read1(1))
1237 self.assertEqual(rawio._reads, 1)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001238 self.assertEqual(b"", bufio.read1(0))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001239 self.assertEqual(b"c", bufio.read1(100))
1240 self.assertEqual(rawio._reads, 1)
1241 self.assertEqual(b"d", bufio.read1(100))
1242 self.assertEqual(rawio._reads, 2)
1243 self.assertEqual(b"efg", bufio.read1(100))
1244 self.assertEqual(rawio._reads, 3)
1245 self.assertEqual(b"", bufio.read1(100))
1246 self.assertEqual(rawio._reads, 4)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001247
1248 def test_read1_arbitrary(self):
1249 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1250 bufio = self.tp(rawio)
1251 self.assertEqual(b"a", bufio.read(1))
1252 self.assertEqual(b"bc", bufio.read1())
1253 self.assertEqual(b"d", bufio.read1())
1254 self.assertEqual(b"efg", bufio.read1(-1))
1255 self.assertEqual(rawio._reads, 3)
1256 self.assertEqual(b"", bufio.read1())
1257 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001258
1259 def test_readinto(self):
1260 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1261 bufio = self.tp(rawio)
1262 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001263 self.assertEqual(bufio.readinto(b), 2)
1264 self.assertEqual(b, b"ab")
1265 self.assertEqual(bufio.readinto(b), 2)
1266 self.assertEqual(b, b"cd")
1267 self.assertEqual(bufio.readinto(b), 2)
1268 self.assertEqual(b, b"ef")
1269 self.assertEqual(bufio.readinto(b), 1)
1270 self.assertEqual(b, b"gf")
1271 self.assertEqual(bufio.readinto(b), 0)
1272 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001273 rawio = self.MockRawIO((b"abc", None))
1274 bufio = self.tp(rawio)
1275 self.assertEqual(bufio.readinto(b), 2)
1276 self.assertEqual(b, b"ab")
1277 self.assertEqual(bufio.readinto(b), 1)
1278 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001279
Benjamin Petersona96fea02014-06-22 14:17:44 -07001280 def test_readinto1(self):
1281 buffer_size = 10
1282 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1283 bufio = self.tp(rawio, buffer_size=buffer_size)
1284 b = bytearray(2)
1285 self.assertEqual(bufio.peek(3), b'abc')
1286 self.assertEqual(rawio._reads, 1)
1287 self.assertEqual(bufio.readinto1(b), 2)
1288 self.assertEqual(b, b"ab")
1289 self.assertEqual(rawio._reads, 1)
1290 self.assertEqual(bufio.readinto1(b), 1)
1291 self.assertEqual(b[:1], b"c")
1292 self.assertEqual(rawio._reads, 1)
1293 self.assertEqual(bufio.readinto1(b), 2)
1294 self.assertEqual(b, b"de")
1295 self.assertEqual(rawio._reads, 2)
1296 b = bytearray(2*buffer_size)
1297 self.assertEqual(bufio.peek(3), b'fgh')
1298 self.assertEqual(rawio._reads, 3)
1299 self.assertEqual(bufio.readinto1(b), 6)
1300 self.assertEqual(b[:6], b"fghjkl")
1301 self.assertEqual(rawio._reads, 4)
1302
1303 def test_readinto_array(self):
1304 buffer_size = 60
1305 data = b"a" * 26
1306 rawio = self.MockRawIO((data,))
1307 bufio = self.tp(rawio, buffer_size=buffer_size)
1308
1309 # Create an array with element size > 1 byte
1310 b = array.array('i', b'x' * 32)
1311 assert len(b) != 16
1312
1313 # Read into it. We should get as many *bytes* as we can fit into b
1314 # (which is more than the number of elements)
1315 n = bufio.readinto(b)
1316 self.assertGreater(n, len(b))
1317
1318 # Check that old contents of b are preserved
1319 bm = memoryview(b).cast('B')
1320 self.assertLess(n, len(bm))
1321 self.assertEqual(bm[:n], data[:n])
1322 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1323
1324 def test_readinto1_array(self):
1325 buffer_size = 60
1326 data = b"a" * 26
1327 rawio = self.MockRawIO((data,))
1328 bufio = self.tp(rawio, buffer_size=buffer_size)
1329
1330 # Create an array with element size > 1 byte
1331 b = array.array('i', b'x' * 32)
1332 assert len(b) != 16
1333
1334 # Read into it. We should get as many *bytes* as we can fit into b
1335 # (which is more than the number of elements)
1336 n = bufio.readinto1(b)
1337 self.assertGreater(n, len(b))
1338
1339 # Check that old contents of b are preserved
1340 bm = memoryview(b).cast('B')
1341 self.assertLess(n, len(bm))
1342 self.assertEqual(bm[:n], data[:n])
1343 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1344
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001345 def test_readlines(self):
1346 def bufio():
1347 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1348 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001349 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1350 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1351 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001352
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001353 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001354 data = b"abcdefghi"
1355 dlen = len(data)
1356
1357 tests = [
1358 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1359 [ 100, [ 3, 3, 3], [ dlen ] ],
1360 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1361 ]
1362
1363 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001364 rawio = self.MockFileIO(data)
1365 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001366 pos = 0
1367 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001368 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001369 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001370 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001371 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001372
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001373 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001374 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001375 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1376 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001377 self.assertEqual(b"abcd", bufio.read(6))
1378 self.assertEqual(b"e", bufio.read(1))
1379 self.assertEqual(b"fg", bufio.read())
1380 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001381 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001382 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001383
Victor Stinnera80987f2011-05-25 22:47:16 +02001384 rawio = self.MockRawIO((b"a", None, None))
1385 self.assertEqual(b"a", rawio.readall())
1386 self.assertIsNone(rawio.readall())
1387
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001388 def test_read_past_eof(self):
1389 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1390 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001391
Ezio Melottib3aedd42010-11-20 19:04:17 +00001392 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001393
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001394 def test_read_all(self):
1395 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1396 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001397
Ezio Melottib3aedd42010-11-20 19:04:17 +00001398 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001399
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001400 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001401 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001402 try:
1403 # Write out many bytes with exactly the same number of 0's,
1404 # 1's... 255's. This will help us check that concurrent reading
1405 # doesn't duplicate or forget contents.
1406 N = 1000
1407 l = list(range(256)) * N
1408 random.shuffle(l)
1409 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001410 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001411 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001412 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001413 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001414 errors = []
1415 results = []
1416 def f():
1417 try:
1418 # Intra-buffer read then buffer-flushing read
1419 for n in cycle([1, 19]):
1420 s = bufio.read(n)
1421 if not s:
1422 break
1423 # list.append() is atomic
1424 results.append(s)
1425 except Exception as e:
1426 errors.append(e)
1427 raise
1428 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001429 with support.start_threads(threads):
1430 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001431 self.assertFalse(errors,
1432 "the following exceptions were caught: %r" % errors)
1433 s = b''.join(results)
1434 for i in range(256):
1435 c = bytes(bytearray([i]))
1436 self.assertEqual(s.count(c), N)
1437 finally:
1438 support.unlink(support.TESTFN)
1439
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001440 def test_unseekable(self):
1441 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1442 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1443 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1444 bufio.read(1)
1445 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1446 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1447
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001448 def test_misbehaved_io(self):
1449 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1450 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001451 self.assertRaises(OSError, bufio.seek, 0)
1452 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001453
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001454 def test_no_extraneous_read(self):
1455 # Issue #9550; when the raw IO object has satisfied the read request,
1456 # we should not issue any additional reads, otherwise it may block
1457 # (e.g. socket).
1458 bufsize = 16
1459 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1460 rawio = self.MockRawIO([b"x" * n])
1461 bufio = self.tp(rawio, bufsize)
1462 self.assertEqual(bufio.read(n), b"x" * n)
1463 # Simple case: one raw read is enough to satisfy the request.
1464 self.assertEqual(rawio._extraneous_reads, 0,
1465 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1466 # A more complex case where two raw reads are needed to satisfy
1467 # the request.
1468 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1469 bufio = self.tp(rawio, bufsize)
1470 self.assertEqual(bufio.read(n), b"x" * n)
1471 self.assertEqual(rawio._extraneous_reads, 0,
1472 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1473
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001474 def test_read_on_closed(self):
1475 # Issue #23796
1476 b = io.BufferedReader(io.BytesIO(b"12"))
1477 b.read(1)
1478 b.close()
1479 self.assertRaises(ValueError, b.peek)
1480 self.assertRaises(ValueError, b.read1, 1)
1481
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001482
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001483class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001484 tp = io.BufferedReader
1485
1486 def test_constructor(self):
1487 BufferedReaderTest.test_constructor(self)
1488 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001489 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001490 if sys.maxsize > 0x7FFFFFFF:
1491 rawio = self.MockRawIO()
1492 bufio = self.tp(rawio)
1493 self.assertRaises((OverflowError, MemoryError, ValueError),
1494 bufio.__init__, rawio, sys.maxsize)
1495
1496 def test_initialization(self):
1497 rawio = self.MockRawIO([b"abc"])
1498 bufio = self.tp(rawio)
1499 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1500 self.assertRaises(ValueError, bufio.read)
1501 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1502 self.assertRaises(ValueError, bufio.read)
1503 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1504 self.assertRaises(ValueError, bufio.read)
1505
1506 def test_misbehaved_io_read(self):
1507 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1508 bufio = self.tp(rawio)
1509 # _pyio.BufferedReader seems to implement reading different, so that
1510 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001511 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001512
1513 def test_garbage_collection(self):
1514 # C BufferedReader objects are collected.
1515 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001516 with support.check_warnings(('', ResourceWarning)):
1517 rawio = self.FileIO(support.TESTFN, "w+b")
1518 f = self.tp(rawio)
1519 f.f = f
1520 wr = weakref.ref(f)
1521 del f
1522 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001523 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001524
R David Murray67bfe802013-02-23 21:51:05 -05001525 def test_args_error(self):
1526 # Issue #17275
1527 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1528 self.tp(io.BytesIO(), 1024, 1024, 1024)
1529
1530
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001531class PyBufferedReaderTest(BufferedReaderTest):
1532 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001533
Guido van Rossuma9e20242007-03-08 00:43:48 +00001534
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001535class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1536 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001537
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001538 def test_constructor(self):
1539 rawio = self.MockRawIO()
1540 bufio = self.tp(rawio)
1541 bufio.__init__(rawio)
1542 bufio.__init__(rawio, buffer_size=1024)
1543 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001544 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001545 bufio.flush()
1546 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1547 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1548 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1549 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001550 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001551 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001552 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001553
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001554 def test_uninitialized(self):
1555 bufio = self.tp.__new__(self.tp)
1556 del bufio
1557 bufio = self.tp.__new__(self.tp)
1558 self.assertRaisesRegex((ValueError, AttributeError),
1559 'uninitialized|has no attribute',
1560 bufio.write, b'')
1561 bufio.__init__(self.MockRawIO())
1562 self.assertEqual(bufio.write(b''), 0)
1563
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001564 def test_detach_flush(self):
1565 raw = self.MockRawIO()
1566 buf = self.tp(raw)
1567 buf.write(b"howdy!")
1568 self.assertFalse(raw._write_stack)
1569 buf.detach()
1570 self.assertEqual(raw._write_stack, [b"howdy!"])
1571
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001572 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001573 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001574 writer = self.MockRawIO()
1575 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001576 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001577 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001578 buffer = bytearray(b"def")
1579 bufio.write(buffer)
1580 buffer[:] = b"***" # Overwrite our copy of the data
1581 bufio.flush()
1582 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001583
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001584 def test_write_overflow(self):
1585 writer = self.MockRawIO()
1586 bufio = self.tp(writer, 8)
1587 contents = b"abcdefghijklmnop"
1588 for n in range(0, len(contents), 3):
1589 bufio.write(contents[n:n+3])
1590 flushed = b"".join(writer._write_stack)
1591 # At least (total - 8) bytes were implicitly flushed, perhaps more
1592 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001593 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001594
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001595 def check_writes(self, intermediate_func):
1596 # Lots of writes, test the flushed output is as expected.
1597 contents = bytes(range(256)) * 1000
1598 n = 0
1599 writer = self.MockRawIO()
1600 bufio = self.tp(writer, 13)
1601 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1602 def gen_sizes():
1603 for size in count(1):
1604 for i in range(15):
1605 yield size
1606 sizes = gen_sizes()
1607 while n < len(contents):
1608 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001609 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001610 intermediate_func(bufio)
1611 n += size
1612 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001613 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001614
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001615 def test_writes(self):
1616 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001617
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001618 def test_writes_and_flushes(self):
1619 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001620
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001621 def test_writes_and_seeks(self):
1622 def _seekabs(bufio):
1623 pos = bufio.tell()
1624 bufio.seek(pos + 1, 0)
1625 bufio.seek(pos - 1, 0)
1626 bufio.seek(pos, 0)
1627 self.check_writes(_seekabs)
1628 def _seekrel(bufio):
1629 pos = bufio.seek(0, 1)
1630 bufio.seek(+1, 1)
1631 bufio.seek(-1, 1)
1632 bufio.seek(pos, 0)
1633 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001634
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001635 def test_writes_and_truncates(self):
1636 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001637
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001638 def test_write_non_blocking(self):
1639 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001640 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001641
Ezio Melottib3aedd42010-11-20 19:04:17 +00001642 self.assertEqual(bufio.write(b"abcd"), 4)
1643 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001644 # 1 byte will be written, the rest will be buffered
1645 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001646 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001647
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001648 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1649 raw.block_on(b"0")
1650 try:
1651 bufio.write(b"opqrwxyz0123456789")
1652 except self.BlockingIOError as e:
1653 written = e.characters_written
1654 else:
1655 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001656 self.assertEqual(written, 16)
1657 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001658 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001659
Ezio Melottib3aedd42010-11-20 19:04:17 +00001660 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001661 s = raw.pop_written()
1662 # Previously buffered bytes were flushed
1663 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001664
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001665 def test_write_and_rewind(self):
1666 raw = io.BytesIO()
1667 bufio = self.tp(raw, 4)
1668 self.assertEqual(bufio.write(b"abcdef"), 6)
1669 self.assertEqual(bufio.tell(), 6)
1670 bufio.seek(0, 0)
1671 self.assertEqual(bufio.write(b"XY"), 2)
1672 bufio.seek(6, 0)
1673 self.assertEqual(raw.getvalue(), b"XYcdef")
1674 self.assertEqual(bufio.write(b"123456"), 6)
1675 bufio.flush()
1676 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001677
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001678 def test_flush(self):
1679 writer = self.MockRawIO()
1680 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001681 bufio.write(b"abc")
1682 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001683 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001684
Antoine Pitrou131a4892012-10-16 22:57:11 +02001685 def test_writelines(self):
1686 l = [b'ab', b'cd', b'ef']
1687 writer = self.MockRawIO()
1688 bufio = self.tp(writer, 8)
1689 bufio.writelines(l)
1690 bufio.flush()
1691 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1692
1693 def test_writelines_userlist(self):
1694 l = UserList([b'ab', b'cd', b'ef'])
1695 writer = self.MockRawIO()
1696 bufio = self.tp(writer, 8)
1697 bufio.writelines(l)
1698 bufio.flush()
1699 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1700
1701 def test_writelines_error(self):
1702 writer = self.MockRawIO()
1703 bufio = self.tp(writer, 8)
1704 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1705 self.assertRaises(TypeError, bufio.writelines, None)
1706 self.assertRaises(TypeError, bufio.writelines, 'abc')
1707
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001708 def test_destructor(self):
1709 writer = self.MockRawIO()
1710 bufio = self.tp(writer, 8)
1711 bufio.write(b"abc")
1712 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001713 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001714 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001715
1716 def test_truncate(self):
1717 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001718 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001719 bufio = self.tp(raw, 8)
1720 bufio.write(b"abcdef")
1721 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001722 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001723 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001724 self.assertEqual(f.read(), b"abc")
1725
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001726 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001727 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001728 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001729 # Write out many bytes from many threads and test they were
1730 # all flushed.
1731 N = 1000
1732 contents = bytes(range(256)) * N
1733 sizes = cycle([1, 19])
1734 n = 0
1735 queue = deque()
1736 while n < len(contents):
1737 size = next(sizes)
1738 queue.append(contents[n:n+size])
1739 n += size
1740 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001741 # We use a real file object because it allows us to
1742 # exercise situations where the GIL is released before
1743 # writing the buffer to the raw streams. This is in addition
1744 # to concurrency issues due to switching threads in the middle
1745 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001746 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001747 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001748 errors = []
1749 def f():
1750 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001751 while True:
1752 try:
1753 s = queue.popleft()
1754 except IndexError:
1755 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001756 bufio.write(s)
1757 except Exception as e:
1758 errors.append(e)
1759 raise
1760 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001761 with support.start_threads(threads):
1762 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001763 self.assertFalse(errors,
1764 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001765 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001766 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001767 s = f.read()
1768 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001769 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001770 finally:
1771 support.unlink(support.TESTFN)
1772
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001773 def test_misbehaved_io(self):
1774 rawio = self.MisbehavedRawIO()
1775 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001776 self.assertRaises(OSError, bufio.seek, 0)
1777 self.assertRaises(OSError, bufio.tell)
1778 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001779
Florent Xicluna109d5732012-07-07 17:03:22 +02001780 def test_max_buffer_size_removal(self):
1781 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001782 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001783
Benjamin Peterson68623612012-12-20 11:53:11 -06001784 def test_write_error_on_close(self):
1785 raw = self.MockRawIO()
1786 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001787 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001788 raw.write = bad_write
1789 b = self.tp(raw)
1790 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001791 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001792 self.assertTrue(b.closed)
1793
benfogle9703f092017-11-10 16:03:40 -05001794 def test_slow_close_from_thread(self):
1795 # Issue #31976
1796 rawio = self.SlowFlushRawIO()
1797 bufio = self.tp(rawio, 8)
1798 t = threading.Thread(target=bufio.close)
1799 t.start()
1800 rawio.in_flush.wait()
1801 self.assertRaises(ValueError, bufio.write, b'spam')
1802 self.assertTrue(bufio.closed)
1803 t.join()
1804
1805
Benjamin Peterson59406a92009-03-26 17:10:29 +00001806
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001807class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001808 tp = io.BufferedWriter
1809
1810 def test_constructor(self):
1811 BufferedWriterTest.test_constructor(self)
1812 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001813 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001814 if sys.maxsize > 0x7FFFFFFF:
1815 rawio = self.MockRawIO()
1816 bufio = self.tp(rawio)
1817 self.assertRaises((OverflowError, MemoryError, ValueError),
1818 bufio.__init__, rawio, sys.maxsize)
1819
1820 def test_initialization(self):
1821 rawio = self.MockRawIO()
1822 bufio = self.tp(rawio)
1823 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1824 self.assertRaises(ValueError, bufio.write, b"def")
1825 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1826 self.assertRaises(ValueError, bufio.write, b"def")
1827 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1828 self.assertRaises(ValueError, bufio.write, b"def")
1829
1830 def test_garbage_collection(self):
1831 # C BufferedWriter objects are collected, and collecting them flushes
1832 # all data to disk.
1833 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001834 with support.check_warnings(('', ResourceWarning)):
1835 rawio = self.FileIO(support.TESTFN, "w+b")
1836 f = self.tp(rawio)
1837 f.write(b"123xxx")
1838 f.x = f
1839 wr = weakref.ref(f)
1840 del f
1841 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001842 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001843 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001844 self.assertEqual(f.read(), b"123xxx")
1845
R David Murray67bfe802013-02-23 21:51:05 -05001846 def test_args_error(self):
1847 # Issue #17275
1848 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1849 self.tp(io.BytesIO(), 1024, 1024, 1024)
1850
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001851
1852class PyBufferedWriterTest(BufferedWriterTest):
1853 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001854
Guido van Rossum01a27522007-03-07 01:00:12 +00001855class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001856
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001857 def test_constructor(self):
1858 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001859 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001860
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001861 def test_uninitialized(self):
1862 pair = self.tp.__new__(self.tp)
1863 del pair
1864 pair = self.tp.__new__(self.tp)
1865 self.assertRaisesRegex((ValueError, AttributeError),
1866 'uninitialized|has no attribute',
1867 pair.read, 0)
1868 self.assertRaisesRegex((ValueError, AttributeError),
1869 'uninitialized|has no attribute',
1870 pair.write, b'')
1871 pair.__init__(self.MockRawIO(), self.MockRawIO())
1872 self.assertEqual(pair.read(0), b'')
1873 self.assertEqual(pair.write(b''), 0)
1874
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001875 def test_detach(self):
1876 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1877 self.assertRaises(self.UnsupportedOperation, pair.detach)
1878
Florent Xicluna109d5732012-07-07 17:03:22 +02001879 def test_constructor_max_buffer_size_removal(self):
1880 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001881 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001882
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001883 def test_constructor_with_not_readable(self):
1884 class NotReadable(MockRawIO):
1885 def readable(self):
1886 return False
1887
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001888 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001889
1890 def test_constructor_with_not_writeable(self):
1891 class NotWriteable(MockRawIO):
1892 def writable(self):
1893 return False
1894
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001895 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001896
1897 def test_read(self):
1898 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1899
1900 self.assertEqual(pair.read(3), b"abc")
1901 self.assertEqual(pair.read(1), b"d")
1902 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001903 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1904 self.assertEqual(pair.read(None), b"abc")
1905
1906 def test_readlines(self):
1907 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1908 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1909 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1910 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001911
1912 def test_read1(self):
1913 # .read1() is delegated to the underlying reader object, so this test
1914 # can be shallow.
1915 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1916
1917 self.assertEqual(pair.read1(3), b"abc")
Martin Panterccb2c0e2016-10-20 23:48:14 +00001918 self.assertEqual(pair.read1(), b"def")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001919
1920 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00001921 for method in ("readinto", "readinto1"):
1922 with self.subTest(method):
1923 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001924
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03001925 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00001926 self.assertEqual(getattr(pair, method)(data), 5)
1927 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001928
1929 def test_write(self):
1930 w = self.MockRawIO()
1931 pair = self.tp(self.MockRawIO(), w)
1932
1933 pair.write(b"abc")
1934 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00001935 buffer = bytearray(b"def")
1936 pair.write(buffer)
1937 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001938 pair.flush()
1939 self.assertEqual(w._write_stack, [b"abc", b"def"])
1940
1941 def test_peek(self):
1942 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1943
1944 self.assertTrue(pair.peek(3).startswith(b"abc"))
1945 self.assertEqual(pair.read(3), b"abc")
1946
1947 def test_readable(self):
1948 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1949 self.assertTrue(pair.readable())
1950
1951 def test_writeable(self):
1952 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1953 self.assertTrue(pair.writable())
1954
1955 def test_seekable(self):
1956 # BufferedRWPairs are never seekable, even if their readers and writers
1957 # are.
1958 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1959 self.assertFalse(pair.seekable())
1960
1961 # .flush() is delegated to the underlying writer object and has been
1962 # tested in the test_write method.
1963
1964 def test_close_and_closed(self):
1965 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1966 self.assertFalse(pair.closed)
1967 pair.close()
1968 self.assertTrue(pair.closed)
1969
Serhiy Storchaka7665be62015-03-24 23:21:57 +02001970 def test_reader_close_error_on_close(self):
1971 def reader_close():
1972 reader_non_existing
1973 reader = self.MockRawIO()
1974 reader.close = reader_close
1975 writer = self.MockRawIO()
1976 pair = self.tp(reader, writer)
1977 with self.assertRaises(NameError) as err:
1978 pair.close()
1979 self.assertIn('reader_non_existing', str(err.exception))
1980 self.assertTrue(pair.closed)
1981 self.assertFalse(reader.closed)
1982 self.assertTrue(writer.closed)
1983
1984 def test_writer_close_error_on_close(self):
1985 def writer_close():
1986 writer_non_existing
1987 reader = self.MockRawIO()
1988 writer = self.MockRawIO()
1989 writer.close = writer_close
1990 pair = self.tp(reader, writer)
1991 with self.assertRaises(NameError) as err:
1992 pair.close()
1993 self.assertIn('writer_non_existing', str(err.exception))
1994 self.assertFalse(pair.closed)
1995 self.assertTrue(reader.closed)
1996 self.assertFalse(writer.closed)
1997
1998 def test_reader_writer_close_error_on_close(self):
1999 def reader_close():
2000 reader_non_existing
2001 def writer_close():
2002 writer_non_existing
2003 reader = self.MockRawIO()
2004 reader.close = reader_close
2005 writer = self.MockRawIO()
2006 writer.close = writer_close
2007 pair = self.tp(reader, writer)
2008 with self.assertRaises(NameError) as err:
2009 pair.close()
2010 self.assertIn('reader_non_existing', str(err.exception))
2011 self.assertIsInstance(err.exception.__context__, NameError)
2012 self.assertIn('writer_non_existing', str(err.exception.__context__))
2013 self.assertFalse(pair.closed)
2014 self.assertFalse(reader.closed)
2015 self.assertFalse(writer.closed)
2016
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002017 def test_isatty(self):
2018 class SelectableIsAtty(MockRawIO):
2019 def __init__(self, isatty):
2020 MockRawIO.__init__(self)
2021 self._isatty = isatty
2022
2023 def isatty(self):
2024 return self._isatty
2025
2026 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2027 self.assertFalse(pair.isatty())
2028
2029 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2030 self.assertTrue(pair.isatty())
2031
2032 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2033 self.assertTrue(pair.isatty())
2034
2035 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2036 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00002037
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04002038 def test_weakref_clearing(self):
2039 brw = self.tp(self.MockRawIO(), self.MockRawIO())
2040 ref = weakref.ref(brw)
2041 brw = None
2042 ref = None # Shouldn't segfault.
2043
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002044class CBufferedRWPairTest(BufferedRWPairTest):
2045 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002046
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002047class PyBufferedRWPairTest(BufferedRWPairTest):
2048 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002049
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002050
2051class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2052 read_mode = "rb+"
2053 write_mode = "wb+"
2054
2055 def test_constructor(self):
2056 BufferedReaderTest.test_constructor(self)
2057 BufferedWriterTest.test_constructor(self)
2058
Serhiy Storchaka61e24932014-02-12 10:52:35 +02002059 def test_uninitialized(self):
2060 BufferedReaderTest.test_uninitialized(self)
2061 BufferedWriterTest.test_uninitialized(self)
2062
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002063 def test_read_and_write(self):
2064 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00002065 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00002066
2067 self.assertEqual(b"as", rw.read(2))
2068 rw.write(b"ddd")
2069 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002070 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002071 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002072 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00002073
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002074 def test_seek_and_tell(self):
2075 raw = self.BytesIO(b"asdfghjkl")
2076 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00002077
Ezio Melottib3aedd42010-11-20 19:04:17 +00002078 self.assertEqual(b"as", rw.read(2))
2079 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002080 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002081 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00002082
Antoine Pitroue05565e2011-08-20 14:39:23 +02002083 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00002084 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02002085 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002086 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002087 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002088 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002089 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002090 self.assertEqual(7, rw.tell())
2091 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02002092 rw.flush()
2093 self.assertEqual(b"asdf123fl", raw.getvalue())
2094
Christian Heimes8e42a0a2007-11-08 18:04:45 +00002095 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002096
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002097 def check_flush_and_read(self, read_func):
2098 raw = self.BytesIO(b"abcdefghi")
2099 bufio = self.tp(raw)
2100
Ezio Melottib3aedd42010-11-20 19:04:17 +00002101 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002102 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002103 self.assertEqual(b"ef", read_func(bufio, 2))
2104 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002105 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002106 self.assertEqual(6, bufio.tell())
2107 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002108 raw.seek(0, 0)
2109 raw.write(b"XYZ")
2110 # flush() resets the read buffer
2111 bufio.flush()
2112 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002113 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002114
2115 def test_flush_and_read(self):
2116 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2117
2118 def test_flush_and_readinto(self):
2119 def _readinto(bufio, n=-1):
2120 b = bytearray(n if n >= 0 else 9999)
2121 n = bufio.readinto(b)
2122 return bytes(b[:n])
2123 self.check_flush_and_read(_readinto)
2124
2125 def test_flush_and_peek(self):
2126 def _peek(bufio, n=-1):
2127 # This relies on the fact that the buffer can contain the whole
2128 # raw stream, otherwise peek() can return less.
2129 b = bufio.peek(n)
2130 if n != -1:
2131 b = b[:n]
2132 bufio.seek(len(b), 1)
2133 return b
2134 self.check_flush_and_read(_peek)
2135
2136 def test_flush_and_write(self):
2137 raw = self.BytesIO(b"abcdefghi")
2138 bufio = self.tp(raw)
2139
2140 bufio.write(b"123")
2141 bufio.flush()
2142 bufio.write(b"45")
2143 bufio.flush()
2144 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002145 self.assertEqual(b"12345fghi", raw.getvalue())
2146 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002147
2148 def test_threads(self):
2149 BufferedReaderTest.test_threads(self)
2150 BufferedWriterTest.test_threads(self)
2151
2152 def test_writes_and_peek(self):
2153 def _peek(bufio):
2154 bufio.peek(1)
2155 self.check_writes(_peek)
2156 def _peek(bufio):
2157 pos = bufio.tell()
2158 bufio.seek(-1, 1)
2159 bufio.peek(1)
2160 bufio.seek(pos, 0)
2161 self.check_writes(_peek)
2162
2163 def test_writes_and_reads(self):
2164 def _read(bufio):
2165 bufio.seek(-1, 1)
2166 bufio.read(1)
2167 self.check_writes(_read)
2168
2169 def test_writes_and_read1s(self):
2170 def _read1(bufio):
2171 bufio.seek(-1, 1)
2172 bufio.read1(1)
2173 self.check_writes(_read1)
2174
2175 def test_writes_and_readintos(self):
2176 def _read(bufio):
2177 bufio.seek(-1, 1)
2178 bufio.readinto(bytearray(1))
2179 self.check_writes(_read)
2180
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002181 def test_write_after_readahead(self):
2182 # Issue #6629: writing after the buffer was filled by readahead should
2183 # first rewind the raw stream.
2184 for overwrite_size in [1, 5]:
2185 raw = self.BytesIO(b"A" * 10)
2186 bufio = self.tp(raw, 4)
2187 # Trigger readahead
2188 self.assertEqual(bufio.read(1), b"A")
2189 self.assertEqual(bufio.tell(), 1)
2190 # Overwriting should rewind the raw stream if it needs so
2191 bufio.write(b"B" * overwrite_size)
2192 self.assertEqual(bufio.tell(), overwrite_size + 1)
2193 # If the write size was smaller than the buffer size, flush() and
2194 # check that rewind happens.
2195 bufio.flush()
2196 self.assertEqual(bufio.tell(), overwrite_size + 1)
2197 s = raw.getvalue()
2198 self.assertEqual(s,
2199 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2200
Antoine Pitrou7c404892011-05-13 00:13:33 +02002201 def test_write_rewind_write(self):
2202 # Various combinations of reading / writing / seeking backwards / writing again
2203 def mutate(bufio, pos1, pos2):
2204 assert pos2 >= pos1
2205 # Fill the buffer
2206 bufio.seek(pos1)
2207 bufio.read(pos2 - pos1)
2208 bufio.write(b'\x02')
2209 # This writes earlier than the previous write, but still inside
2210 # the buffer.
2211 bufio.seek(pos1)
2212 bufio.write(b'\x01')
2213
2214 b = b"\x80\x81\x82\x83\x84"
2215 for i in range(0, len(b)):
2216 for j in range(i, len(b)):
2217 raw = self.BytesIO(b)
2218 bufio = self.tp(raw, 100)
2219 mutate(bufio, i, j)
2220 bufio.flush()
2221 expected = bytearray(b)
2222 expected[j] = 2
2223 expected[i] = 1
2224 self.assertEqual(raw.getvalue(), expected,
2225 "failed result for i=%d, j=%d" % (i, j))
2226
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002227 def test_truncate_after_read_or_write(self):
2228 raw = self.BytesIO(b"A" * 10)
2229 bufio = self.tp(raw, 100)
2230 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2231 self.assertEqual(bufio.truncate(), 2)
2232 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2233 self.assertEqual(bufio.truncate(), 4)
2234
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002235 def test_misbehaved_io(self):
2236 BufferedReaderTest.test_misbehaved_io(self)
2237 BufferedWriterTest.test_misbehaved_io(self)
2238
Antoine Pitroue05565e2011-08-20 14:39:23 +02002239 def test_interleaved_read_write(self):
2240 # Test for issue #12213
2241 with self.BytesIO(b'abcdefgh') as raw:
2242 with self.tp(raw, 100) as f:
2243 f.write(b"1")
2244 self.assertEqual(f.read(1), b'b')
2245 f.write(b'2')
2246 self.assertEqual(f.read1(1), b'd')
2247 f.write(b'3')
2248 buf = bytearray(1)
2249 f.readinto(buf)
2250 self.assertEqual(buf, b'f')
2251 f.write(b'4')
2252 self.assertEqual(f.peek(1), b'h')
2253 f.flush()
2254 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2255
2256 with self.BytesIO(b'abc') as raw:
2257 with self.tp(raw, 100) as f:
2258 self.assertEqual(f.read(1), b'a')
2259 f.write(b"2")
2260 self.assertEqual(f.read(1), b'c')
2261 f.flush()
2262 self.assertEqual(raw.getvalue(), b'a2c')
2263
2264 def test_interleaved_readline_write(self):
2265 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2266 with self.tp(raw) as f:
2267 f.write(b'1')
2268 self.assertEqual(f.readline(), b'b\n')
2269 f.write(b'2')
2270 self.assertEqual(f.readline(), b'def\n')
2271 f.write(b'3')
2272 self.assertEqual(f.readline(), b'\n')
2273 f.flush()
2274 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2275
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002276 # You can't construct a BufferedRandom over a non-seekable stream.
2277 test_unseekable = None
2278
R David Murray67bfe802013-02-23 21:51:05 -05002279
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002280class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002281 tp = io.BufferedRandom
2282
2283 def test_constructor(self):
2284 BufferedRandomTest.test_constructor(self)
2285 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08002286 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002287 if sys.maxsize > 0x7FFFFFFF:
2288 rawio = self.MockRawIO()
2289 bufio = self.tp(rawio)
2290 self.assertRaises((OverflowError, MemoryError, ValueError),
2291 bufio.__init__, rawio, sys.maxsize)
2292
2293 def test_garbage_collection(self):
2294 CBufferedReaderTest.test_garbage_collection(self)
2295 CBufferedWriterTest.test_garbage_collection(self)
2296
R David Murray67bfe802013-02-23 21:51:05 -05002297 def test_args_error(self):
2298 # Issue #17275
2299 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2300 self.tp(io.BytesIO(), 1024, 1024, 1024)
2301
2302
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002303class PyBufferedRandomTest(BufferedRandomTest):
2304 tp = pyio.BufferedRandom
2305
2306
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002307# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2308# properties:
2309# - A single output character can correspond to many bytes of input.
2310# - The number of input bytes to complete the character can be
2311# undetermined until the last input byte is received.
2312# - The number of input bytes can vary depending on previous input.
2313# - A single input byte can correspond to many characters of output.
2314# - The number of output characters can be undetermined until the
2315# last input byte is received.
2316# - The number of output characters can vary depending on previous input.
2317
2318class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2319 """
2320 For testing seek/tell behavior with a stateful, buffering decoder.
2321
2322 Input is a sequence of words. Words may be fixed-length (length set
2323 by input) or variable-length (period-terminated). In variable-length
2324 mode, extra periods are ignored. Possible words are:
2325 - 'i' followed by a number sets the input length, I (maximum 99).
2326 When I is set to 0, words are space-terminated.
2327 - 'o' followed by a number sets the output length, O (maximum 99).
2328 - Any other word is converted into a word followed by a period on
2329 the output. The output word consists of the input word truncated
2330 or padded out with hyphens to make its length equal to O. If O
2331 is 0, the word is output verbatim without truncating or padding.
2332 I and O are initially set to 1. When I changes, any buffered input is
2333 re-scanned according to the new I. EOF also terminates the last word.
2334 """
2335
2336 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002337 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002338 self.reset()
2339
2340 def __repr__(self):
2341 return '<SID %x>' % id(self)
2342
2343 def reset(self):
2344 self.i = 1
2345 self.o = 1
2346 self.buffer = bytearray()
2347
2348 def getstate(self):
2349 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2350 return bytes(self.buffer), i*100 + o
2351
2352 def setstate(self, state):
2353 buffer, io = state
2354 self.buffer = bytearray(buffer)
2355 i, o = divmod(io, 100)
2356 self.i, self.o = i ^ 1, o ^ 1
2357
2358 def decode(self, input, final=False):
2359 output = ''
2360 for b in input:
2361 if self.i == 0: # variable-length, terminated with period
2362 if b == ord('.'):
2363 if self.buffer:
2364 output += self.process_word()
2365 else:
2366 self.buffer.append(b)
2367 else: # fixed-length, terminate after self.i bytes
2368 self.buffer.append(b)
2369 if len(self.buffer) == self.i:
2370 output += self.process_word()
2371 if final and self.buffer: # EOF terminates the last word
2372 output += self.process_word()
2373 return output
2374
2375 def process_word(self):
2376 output = ''
2377 if self.buffer[0] == ord('i'):
2378 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2379 elif self.buffer[0] == ord('o'):
2380 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2381 else:
2382 output = self.buffer.decode('ascii')
2383 if len(output) < self.o:
2384 output += '-'*self.o # pad out with hyphens
2385 if self.o:
2386 output = output[:self.o] # truncate to output length
2387 output += '.'
2388 self.buffer = bytearray()
2389 return output
2390
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002391 codecEnabled = False
2392
2393 @classmethod
2394 def lookupTestDecoder(cls, name):
2395 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002396 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002397 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002398 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002399 incrementalencoder=None,
2400 streamreader=None, streamwriter=None,
2401 incrementaldecoder=cls)
2402
2403# Register the previous decoder for testing.
2404# Disabled by default, tests will enable it.
2405codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2406
2407
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002408class StatefulIncrementalDecoderTest(unittest.TestCase):
2409 """
2410 Make sure the StatefulIncrementalDecoder actually works.
2411 """
2412
2413 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002414 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002415 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002416 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002417 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002418 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002419 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002420 # I=0, O=6 (variable-length input, fixed-length output)
2421 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2422 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002423 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002424 # I=6, O=3 (fixed-length input > fixed-length output)
2425 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2426 # I=0, then 3; O=29, then 15 (with longer output)
2427 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2428 'a----------------------------.' +
2429 'b----------------------------.' +
2430 'cde--------------------------.' +
2431 'abcdefghijabcde.' +
2432 'a.b------------.' +
2433 '.c.------------.' +
2434 'd.e------------.' +
2435 'k--------------.' +
2436 'l--------------.' +
2437 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002438 ]
2439
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002440 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002441 # Try a few one-shot test cases.
2442 for input, eof, output in self.test_cases:
2443 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002444 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002445
2446 # Also test an unfinished decode, followed by forcing EOF.
2447 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002448 self.assertEqual(d.decode(b'oiabcd'), '')
2449 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002450
2451class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002452
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002453 def setUp(self):
2454 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2455 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002456 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002457
Guido van Rossumd0712812007-04-11 16:32:43 +00002458 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002459 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002460
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002461 def test_constructor(self):
2462 r = self.BytesIO(b"\xc3\xa9\n\n")
2463 b = self.BufferedReader(r, 1000)
2464 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002465 t.__init__(b, encoding="latin-1", newline="\r\n")
2466 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002467 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002468 t.__init__(b, encoding="utf-8", line_buffering=True)
2469 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002470 self.assertEqual(t.line_buffering, True)
2471 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002472 self.assertRaises(TypeError, t.__init__, b, newline=42)
2473 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2474
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002475 def test_uninitialized(self):
2476 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2477 del t
2478 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2479 self.assertRaises(Exception, repr, t)
2480 self.assertRaisesRegex((ValueError, AttributeError),
2481 'uninitialized|has no attribute',
2482 t.read, 0)
2483 t.__init__(self.MockRawIO())
2484 self.assertEqual(t.read(0), '')
2485
Nick Coghlana9b15242014-02-04 22:11:18 +10002486 def test_non_text_encoding_codecs_are_rejected(self):
2487 # Ensure the constructor complains if passed a codec that isn't
2488 # marked as a text encoding
2489 # http://bugs.python.org/issue20404
2490 r = self.BytesIO()
2491 b = self.BufferedWriter(r)
2492 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2493 self.TextIOWrapper(b, encoding="hex")
2494
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002495 def test_detach(self):
2496 r = self.BytesIO()
2497 b = self.BufferedWriter(r)
2498 t = self.TextIOWrapper(b)
2499 self.assertIs(t.detach(), b)
2500
2501 t = self.TextIOWrapper(b, encoding="ascii")
2502 t.write("howdy")
2503 self.assertFalse(r.getvalue())
2504 t.detach()
2505 self.assertEqual(r.getvalue(), b"howdy")
2506 self.assertRaises(ValueError, t.detach)
2507
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002508 # Operations independent of the detached stream should still work
2509 repr(t)
2510 self.assertEqual(t.encoding, "ascii")
2511 self.assertEqual(t.errors, "strict")
2512 self.assertFalse(t.line_buffering)
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002513 self.assertFalse(t.write_through)
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002514
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002515 def test_repr(self):
2516 raw = self.BytesIO("hello".encode("utf-8"))
2517 b = self.BufferedReader(raw)
2518 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002519 modname = self.TextIOWrapper.__module__
2520 self.assertEqual(repr(t),
2521 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2522 raw.name = "dummy"
2523 self.assertEqual(repr(t),
2524 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002525 t.mode = "r"
2526 self.assertEqual(repr(t),
2527 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002528 raw.name = b"dummy"
2529 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002530 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002531
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002532 t.buffer.detach()
2533 repr(t) # Should not raise an exception
2534
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002535 def test_recursive_repr(self):
2536 # Issue #25455
2537 raw = self.BytesIO()
2538 t = self.TextIOWrapper(raw)
2539 with support.swap_attr(raw, 'name', t):
2540 try:
2541 repr(t) # Should not crash
2542 except RuntimeError:
2543 pass
2544
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002545 def test_line_buffering(self):
2546 r = self.BytesIO()
2547 b = self.BufferedWriter(r, 1000)
2548 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002549 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002550 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002551 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002552 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002553 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002554 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002555
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002556 def test_reconfigure_line_buffering(self):
2557 r = self.BytesIO()
2558 b = self.BufferedWriter(r, 1000)
2559 t = self.TextIOWrapper(b, newline="\n", line_buffering=False)
2560 t.write("AB\nC")
2561 self.assertEqual(r.getvalue(), b"")
2562
2563 t.reconfigure(line_buffering=True) # implicit flush
2564 self.assertEqual(r.getvalue(), b"AB\nC")
2565 t.write("DEF\nG")
2566 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2567 t.write("H")
2568 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2569 t.reconfigure(line_buffering=False) # implicit flush
2570 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2571 t.write("IJ")
2572 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2573
2574 # Keeping default value
2575 t.reconfigure()
2576 t.reconfigure(line_buffering=None)
2577 self.assertEqual(t.line_buffering, False)
2578 t.reconfigure(line_buffering=True)
2579 t.reconfigure()
2580 t.reconfigure(line_buffering=None)
2581 self.assertEqual(t.line_buffering, True)
2582
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002583 def test_default_encoding(self):
2584 old_environ = dict(os.environ)
2585 try:
2586 # try to get a user preferred encoding different than the current
2587 # locale encoding to check that TextIOWrapper() uses the current
2588 # locale encoding and not the user preferred encoding
2589 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2590 if key in os.environ:
2591 del os.environ[key]
2592
2593 current_locale_encoding = locale.getpreferredencoding(False)
2594 b = self.BytesIO()
2595 t = self.TextIOWrapper(b)
2596 self.assertEqual(t.encoding, current_locale_encoding)
2597 finally:
2598 os.environ.clear()
2599 os.environ.update(old_environ)
2600
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002601 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002602 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002603 # Issue 15989
2604 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002605 b = self.BytesIO()
2606 b.fileno = lambda: _testcapi.INT_MAX + 1
2607 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2608 b.fileno = lambda: _testcapi.UINT_MAX + 1
2609 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2610
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002611 def test_encoding(self):
2612 # Check the encoding attribute is always set, and valid
2613 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002614 t = self.TextIOWrapper(b, encoding="utf-8")
2615 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002616 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002617 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002618 codecs.lookup(t.encoding)
2619
2620 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002621 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002622 b = self.BytesIO(b"abc\n\xff\n")
2623 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002624 self.assertRaises(UnicodeError, t.read)
2625 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002626 b = self.BytesIO(b"abc\n\xff\n")
2627 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002628 self.assertRaises(UnicodeError, t.read)
2629 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002630 b = self.BytesIO(b"abc\n\xff\n")
2631 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002632 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002633 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002634 b = self.BytesIO(b"abc\n\xff\n")
2635 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002636 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002637
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002638 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002639 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002640 b = self.BytesIO()
2641 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002642 self.assertRaises(UnicodeError, t.write, "\xff")
2643 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002644 b = self.BytesIO()
2645 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002646 self.assertRaises(UnicodeError, t.write, "\xff")
2647 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002648 b = self.BytesIO()
2649 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002650 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002651 t.write("abc\xffdef\n")
2652 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002653 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002654 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002655 b = self.BytesIO()
2656 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002657 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002658 t.write("abc\xffdef\n")
2659 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002660 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002661
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002662 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002663 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2664
2665 tests = [
2666 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002667 [ '', input_lines ],
2668 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2669 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2670 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002671 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002672 encodings = (
2673 'utf-8', 'latin-1',
2674 'utf-16', 'utf-16-le', 'utf-16-be',
2675 'utf-32', 'utf-32-le', 'utf-32-be',
2676 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002677
Guido van Rossum8358db22007-08-18 21:39:55 +00002678 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002679 # character in TextIOWrapper._pending_line.
2680 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002681 # XXX: str.encode() should return bytes
2682 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002683 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002684 for bufsize in range(1, 10):
2685 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002686 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2687 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002688 encoding=encoding)
2689 if do_reads:
2690 got_lines = []
2691 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002692 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002693 if c2 == '':
2694 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002695 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002696 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002697 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002698 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002699
2700 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002701 self.assertEqual(got_line, exp_line)
2702 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002703
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002704 def test_newlines_input(self):
2705 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002706 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2707 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002708 (None, normalized.decode("ascii").splitlines(keepends=True)),
2709 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002710 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2711 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2712 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002713 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002714 buf = self.BytesIO(testdata)
2715 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002716 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002717 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002718 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002719
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002720 def test_newlines_output(self):
2721 testdict = {
2722 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2723 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2724 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2725 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2726 }
2727 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2728 for newline, expected in tests:
2729 buf = self.BytesIO()
2730 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2731 txt.write("AAA\nB")
2732 txt.write("BB\nCCC\n")
2733 txt.write("X\rY\r\nZ")
2734 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002735 self.assertEqual(buf.closed, False)
2736 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002737
2738 def test_destructor(self):
2739 l = []
2740 base = self.BytesIO
2741 class MyBytesIO(base):
2742 def close(self):
2743 l.append(self.getvalue())
2744 base.close(self)
2745 b = MyBytesIO()
2746 t = self.TextIOWrapper(b, encoding="ascii")
2747 t.write("abc")
2748 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002749 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002750 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002751
2752 def test_override_destructor(self):
2753 record = []
2754 class MyTextIO(self.TextIOWrapper):
2755 def __del__(self):
2756 record.append(1)
2757 try:
2758 f = super().__del__
2759 except AttributeError:
2760 pass
2761 else:
2762 f()
2763 def close(self):
2764 record.append(2)
2765 super().close()
2766 def flush(self):
2767 record.append(3)
2768 super().flush()
2769 b = self.BytesIO()
2770 t = MyTextIO(b, encoding="ascii")
2771 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002772 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002773 self.assertEqual(record, [1, 2, 3])
2774
2775 def test_error_through_destructor(self):
2776 # Test that the exception state is not modified by a destructor,
2777 # even if close() fails.
2778 rawio = self.CloseFailureIO()
2779 def f():
2780 self.TextIOWrapper(rawio).xyzzy
2781 with support.captured_output("stderr") as s:
2782 self.assertRaises(AttributeError, f)
2783 s = s.getvalue().strip()
2784 if s:
2785 # The destructor *may* have printed an unraisable error, check it
2786 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002787 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002788 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002789
Guido van Rossum9b76da62007-04-11 01:09:03 +00002790 # Systematic tests of the text I/O API
2791
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002792 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002793 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002794 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002795 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002796 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002797 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002798 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002799 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002800 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002801 self.assertEqual(f.tell(), 0)
2802 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002803 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002804 self.assertEqual(f.seek(0), 0)
2805 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002806 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002807 self.assertEqual(f.read(2), "ab")
2808 self.assertEqual(f.read(1), "c")
2809 self.assertEqual(f.read(1), "")
2810 self.assertEqual(f.read(), "")
2811 self.assertEqual(f.tell(), cookie)
2812 self.assertEqual(f.seek(0), 0)
2813 self.assertEqual(f.seek(0, 2), cookie)
2814 self.assertEqual(f.write("def"), 3)
2815 self.assertEqual(f.seek(cookie), cookie)
2816 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002817 if enc.startswith("utf"):
2818 self.multi_line_test(f, enc)
2819 f.close()
2820
2821 def multi_line_test(self, f, enc):
2822 f.seek(0)
2823 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002824 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002825 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002826 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002827 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002828 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002829 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002830 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002831 wlines.append((f.tell(), line))
2832 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002833 f.seek(0)
2834 rlines = []
2835 while True:
2836 pos = f.tell()
2837 line = f.readline()
2838 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002839 break
2840 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002841 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002842
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002843 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002844 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002845 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002846 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002847 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002848 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002849 p2 = f.tell()
2850 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002851 self.assertEqual(f.tell(), p0)
2852 self.assertEqual(f.readline(), "\xff\n")
2853 self.assertEqual(f.tell(), p1)
2854 self.assertEqual(f.readline(), "\xff\n")
2855 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002856 f.seek(0)
2857 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002858 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002859 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002860 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002861 f.close()
2862
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002863 def test_seeking(self):
2864 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002865 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002866 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002867 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002868 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002869 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002870 suffix = bytes(u_suffix.encode("utf-8"))
2871 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002872 with self.open(support.TESTFN, "wb") as f:
2873 f.write(line*2)
2874 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2875 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002876 self.assertEqual(s, str(prefix, "ascii"))
2877 self.assertEqual(f.tell(), prefix_size)
2878 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002879
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002880 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002881 # Regression test for a specific bug
2882 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002883 with self.open(support.TESTFN, "wb") as f:
2884 f.write(data)
2885 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2886 f._CHUNK_SIZE # Just test that it exists
2887 f._CHUNK_SIZE = 2
2888 f.readline()
2889 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002890
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002891 def test_seek_and_tell(self):
2892 #Test seek/tell using the StatefulIncrementalDecoder.
2893 # Make test faster by doing smaller seeks
2894 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002895
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002896 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002897 """Tell/seek to various points within a data stream and ensure
2898 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002899 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002900 f.write(data)
2901 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002902 f = self.open(support.TESTFN, encoding='test_decoder')
2903 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002904 decoded = f.read()
2905 f.close()
2906
Neal Norwitze2b07052008-03-18 19:52:05 +00002907 for i in range(min_pos, len(decoded) + 1): # seek positions
2908 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002909 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002910 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002911 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002912 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002913 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002914 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002915 f.close()
2916
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002917 # Enable the test decoder.
2918 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002919
2920 # Run the tests.
2921 try:
2922 # Try each test case.
2923 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002924 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002925
2926 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002927 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2928 offset = CHUNK_SIZE - len(input)//2
2929 prefix = b'.'*offset
2930 # Don't bother seeking into the prefix (takes too long).
2931 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002932 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002933
2934 # Ensure our test decoder won't interfere with subsequent tests.
2935 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002936 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002937
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002938 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002939 data = "1234567890"
2940 tests = ("utf-16",
2941 "utf-16-le",
2942 "utf-16-be",
2943 "utf-32",
2944 "utf-32-le",
2945 "utf-32-be")
2946 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002947 buf = self.BytesIO()
2948 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002949 # Check if the BOM is written only once (see issue1753).
2950 f.write(data)
2951 f.write(data)
2952 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002953 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002954 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002955 self.assertEqual(f.read(), data * 2)
2956 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002957
Benjamin Petersona1b49012009-03-31 23:11:32 +00002958 def test_unreadable(self):
2959 class UnReadable(self.BytesIO):
2960 def readable(self):
2961 return False
2962 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002963 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002964
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002965 def test_read_one_by_one(self):
2966 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002967 reads = ""
2968 while True:
2969 c = txt.read(1)
2970 if not c:
2971 break
2972 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002973 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002974
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002975 def test_readlines(self):
2976 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2977 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2978 txt.seek(0)
2979 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2980 txt.seek(0)
2981 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2982
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002983 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002984 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002985 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002986 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002987 reads = ""
2988 while True:
2989 c = txt.read(128)
2990 if not c:
2991 break
2992 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002993 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002994
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002995 def test_writelines(self):
2996 l = ['ab', 'cd', 'ef']
2997 buf = self.BytesIO()
2998 txt = self.TextIOWrapper(buf)
2999 txt.writelines(l)
3000 txt.flush()
3001 self.assertEqual(buf.getvalue(), b'abcdef')
3002
3003 def test_writelines_userlist(self):
3004 l = UserList(['ab', 'cd', 'ef'])
3005 buf = self.BytesIO()
3006 txt = self.TextIOWrapper(buf)
3007 txt.writelines(l)
3008 txt.flush()
3009 self.assertEqual(buf.getvalue(), b'abcdef')
3010
3011 def test_writelines_error(self):
3012 txt = self.TextIOWrapper(self.BytesIO())
3013 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3014 self.assertRaises(TypeError, txt.writelines, None)
3015 self.assertRaises(TypeError, txt.writelines, b'abc')
3016
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003017 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003018 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003019
3020 # read one char at a time
3021 reads = ""
3022 while True:
3023 c = txt.read(1)
3024 if not c:
3025 break
3026 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003027 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003028
3029 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003030 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003031 txt._CHUNK_SIZE = 4
3032
3033 reads = ""
3034 while True:
3035 c = txt.read(4)
3036 if not c:
3037 break
3038 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003039 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003040
3041 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003042 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003043 txt._CHUNK_SIZE = 4
3044
3045 reads = txt.read(4)
3046 reads += txt.read(4)
3047 reads += txt.readline()
3048 reads += txt.readline()
3049 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003050 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003051
3052 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003053 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003054 txt._CHUNK_SIZE = 4
3055
3056 reads = txt.read(4)
3057 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003058 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003059
3060 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003061 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003062 txt._CHUNK_SIZE = 4
3063
3064 reads = txt.read(4)
3065 pos = txt.tell()
3066 txt.seek(0)
3067 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003068 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003069
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003070 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003071 buffer = self.BytesIO(self.testdata)
3072 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003073
3074 self.assertEqual(buffer.seekable(), txt.seekable())
3075
Antoine Pitroue4501852009-05-14 18:55:55 +00003076 def test_append_bom(self):
3077 # The BOM is not written again when appending to a non-empty file
3078 filename = support.TESTFN
3079 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3080 with self.open(filename, 'w', encoding=charset) as f:
3081 f.write('aaa')
3082 pos = f.tell()
3083 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003084 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003085
3086 with self.open(filename, 'a', encoding=charset) as f:
3087 f.write('xxx')
3088 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003089 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003090
3091 def test_seek_bom(self):
3092 # Same test, but when seeking manually
3093 filename = support.TESTFN
3094 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3095 with self.open(filename, 'w', encoding=charset) as f:
3096 f.write('aaa')
3097 pos = f.tell()
3098 with self.open(filename, 'r+', encoding=charset) as f:
3099 f.seek(pos)
3100 f.write('zzz')
3101 f.seek(0)
3102 f.write('bbb')
3103 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003104 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003105
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003106 def test_seek_append_bom(self):
3107 # Same test, but first seek to the start and then to the end
3108 filename = support.TESTFN
3109 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3110 with self.open(filename, 'w', encoding=charset) as f:
3111 f.write('aaa')
3112 with self.open(filename, 'a', encoding=charset) as f:
3113 f.seek(0)
3114 f.seek(0, self.SEEK_END)
3115 f.write('xxx')
3116 with self.open(filename, 'rb') as f:
3117 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3118
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003119 def test_errors_property(self):
3120 with self.open(support.TESTFN, "w") as f:
3121 self.assertEqual(f.errors, "strict")
3122 with self.open(support.TESTFN, "w", errors="replace") as f:
3123 self.assertEqual(f.errors, "replace")
3124
Brett Cannon31f59292011-02-21 19:29:56 +00003125 @support.no_tracing
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003126 def test_threads_write(self):
3127 # Issue6750: concurrent writes could duplicate data
3128 event = threading.Event()
3129 with self.open(support.TESTFN, "w", buffering=1) as f:
3130 def run(n):
3131 text = "Thread%03d\n" % n
3132 event.wait()
3133 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003134 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003135 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003136 with support.start_threads(threads, event.set):
3137 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003138 with self.open(support.TESTFN) as f:
3139 content = f.read()
3140 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003141 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003142
Antoine Pitrou6be88762010-05-03 16:48:20 +00003143 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003144 # Test that text file is closed despite failed flush
3145 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003146 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003147 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003148 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003149 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003150 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003151 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003152 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003153 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003154 self.assertTrue(txt.buffer.closed)
3155 self.assertTrue(closed) # flush() called
3156 self.assertFalse(closed[0]) # flush() called before file closed
3157 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003158 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003159
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003160 def test_close_error_on_close(self):
3161 buffer = self.BytesIO(self.testdata)
3162 def bad_flush():
3163 raise OSError('flush')
3164 def bad_close():
3165 raise OSError('close')
3166 buffer.close = bad_close
3167 txt = self.TextIOWrapper(buffer, encoding="ascii")
3168 txt.flush = bad_flush
3169 with self.assertRaises(OSError) as err: # exception not swallowed
3170 txt.close()
3171 self.assertEqual(err.exception.args, ('close',))
3172 self.assertIsInstance(err.exception.__context__, OSError)
3173 self.assertEqual(err.exception.__context__.args, ('flush',))
3174 self.assertFalse(txt.closed)
3175
3176 def test_nonnormalized_close_error_on_close(self):
3177 # Issue #21677
3178 buffer = self.BytesIO(self.testdata)
3179 def bad_flush():
3180 raise non_existing_flush
3181 def bad_close():
3182 raise non_existing_close
3183 buffer.close = bad_close
3184 txt = self.TextIOWrapper(buffer, encoding="ascii")
3185 txt.flush = bad_flush
3186 with self.assertRaises(NameError) as err: # exception not swallowed
3187 txt.close()
3188 self.assertIn('non_existing_close', str(err.exception))
3189 self.assertIsInstance(err.exception.__context__, NameError)
3190 self.assertIn('non_existing_flush', str(err.exception.__context__))
3191 self.assertFalse(txt.closed)
3192
Antoine Pitrou6be88762010-05-03 16:48:20 +00003193 def test_multi_close(self):
3194 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3195 txt.close()
3196 txt.close()
3197 txt.close()
3198 self.assertRaises(ValueError, txt.flush)
3199
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003200 def test_unseekable(self):
3201 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3202 self.assertRaises(self.UnsupportedOperation, txt.tell)
3203 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3204
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003205 def test_readonly_attributes(self):
3206 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3207 buf = self.BytesIO(self.testdata)
3208 with self.assertRaises(AttributeError):
3209 txt.buffer = buf
3210
Antoine Pitroue96ec682011-07-23 21:46:35 +02003211 def test_rawio(self):
3212 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3213 # that subprocess.Popen() can have the required unbuffered
3214 # semantics with universal_newlines=True.
3215 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3216 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3217 # Reads
3218 self.assertEqual(txt.read(4), 'abcd')
3219 self.assertEqual(txt.readline(), 'efghi\n')
3220 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3221
3222 def test_rawio_write_through(self):
3223 # Issue #12591: with write_through=True, writes don't need a flush
3224 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3225 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3226 write_through=True)
3227 txt.write('1')
3228 txt.write('23\n4')
3229 txt.write('5')
3230 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3231
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003232 def test_bufio_write_through(self):
3233 # Issue #21396: write_through=True doesn't force a flush()
3234 # on the underlying binary buffered object.
3235 flush_called, write_called = [], []
3236 class BufferedWriter(self.BufferedWriter):
3237 def flush(self, *args, **kwargs):
3238 flush_called.append(True)
3239 return super().flush(*args, **kwargs)
3240 def write(self, *args, **kwargs):
3241 write_called.append(True)
3242 return super().write(*args, **kwargs)
3243
3244 rawio = self.BytesIO()
3245 data = b"a"
3246 bufio = BufferedWriter(rawio, len(data)*2)
3247 textio = self.TextIOWrapper(bufio, encoding='ascii',
3248 write_through=True)
3249 # write to the buffered io but don't overflow the buffer
3250 text = data.decode('ascii')
3251 textio.write(text)
3252
3253 # buffer.flush is not called with write_through=True
3254 self.assertFalse(flush_called)
3255 # buffer.write *is* called with write_through=True
3256 self.assertTrue(write_called)
3257 self.assertEqual(rawio.getvalue(), b"") # no flush
3258
3259 write_called = [] # reset
3260 textio.write(text * 10) # total content is larger than bufio buffer
3261 self.assertTrue(write_called)
3262 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3263
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02003264 def test_reconfigure_write_through(self):
3265 raw = self.MockRawIO([])
3266 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3267 t.write('1')
3268 t.reconfigure(write_through=True) # implied flush
3269 self.assertEqual(t.write_through, True)
3270 self.assertEqual(b''.join(raw._write_stack), b'1')
3271 t.write('23')
3272 self.assertEqual(b''.join(raw._write_stack), b'123')
3273 t.reconfigure(write_through=False)
3274 self.assertEqual(t.write_through, False)
3275 t.write('45')
3276 t.flush()
3277 self.assertEqual(b''.join(raw._write_stack), b'12345')
3278 # Keeping default value
3279 t.reconfigure()
3280 t.reconfigure(write_through=None)
3281 self.assertEqual(t.write_through, False)
3282 t.reconfigure(write_through=True)
3283 t.reconfigure()
3284 t.reconfigure(write_through=None)
3285 self.assertEqual(t.write_through, True)
3286
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003287 def test_read_nonbytes(self):
3288 # Issue #17106
3289 # Crash when underlying read() returns non-bytes
3290 t = self.TextIOWrapper(self.StringIO('a'))
3291 self.assertRaises(TypeError, t.read, 1)
3292 t = self.TextIOWrapper(self.StringIO('a'))
3293 self.assertRaises(TypeError, t.readline)
3294 t = self.TextIOWrapper(self.StringIO('a'))
3295 self.assertRaises(TypeError, t.read)
3296
Oren Milmana5b4ea12017-08-25 21:14:54 +03003297 def test_illegal_encoder(self):
3298 # Issue 31271: Calling write() while the return value of encoder's
3299 # encode() is invalid shouldn't cause an assertion failure.
3300 rot13 = codecs.lookup("rot13")
3301 with support.swap_attr(rot13, '_is_text_encoding', True):
3302 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3303 self.assertRaises(TypeError, t.write, 'bar')
3304
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003305 def test_illegal_decoder(self):
3306 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003307 # Bypass the early encoding check added in issue 20404
3308 def _make_illegal_wrapper():
3309 quopri = codecs.lookup("quopri")
3310 quopri._is_text_encoding = True
3311 try:
3312 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3313 newline='\n', encoding="quopri")
3314 finally:
3315 quopri._is_text_encoding = False
3316 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003317 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003318 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003319 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003320 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003321 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003322 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003323 self.assertRaises(TypeError, t.read)
3324
Oren Milmanba7d7362017-08-29 11:58:27 +03003325 # Issue 31243: calling read() while the return value of decoder's
3326 # getstate() is invalid should neither crash the interpreter nor
3327 # raise a SystemError.
3328 def _make_very_illegal_wrapper(getstate_ret_val):
3329 class BadDecoder:
3330 def getstate(self):
3331 return getstate_ret_val
3332 def _get_bad_decoder(dummy):
3333 return BadDecoder()
3334 quopri = codecs.lookup("quopri")
3335 with support.swap_attr(quopri, 'incrementaldecoder',
3336 _get_bad_decoder):
3337 return _make_illegal_wrapper()
3338 t = _make_very_illegal_wrapper(42)
3339 self.assertRaises(TypeError, t.read, 42)
3340 t = _make_very_illegal_wrapper(())
3341 self.assertRaises(TypeError, t.read, 42)
3342 t = _make_very_illegal_wrapper((1, 2))
3343 self.assertRaises(TypeError, t.read, 42)
3344
Antoine Pitrou712cb732013-12-21 15:51:54 +01003345 def _check_create_at_shutdown(self, **kwargs):
3346 # Issue #20037: creating a TextIOWrapper at shutdown
3347 # shouldn't crash the interpreter.
3348 iomod = self.io.__name__
3349 code = """if 1:
3350 import codecs
3351 import {iomod} as io
3352
3353 # Avoid looking up codecs at shutdown
3354 codecs.lookup('utf-8')
3355
3356 class C:
3357 def __init__(self):
3358 self.buf = io.BytesIO()
3359 def __del__(self):
3360 io.TextIOWrapper(self.buf, **{kwargs})
3361 print("ok")
3362 c = C()
3363 """.format(iomod=iomod, kwargs=kwargs)
3364 return assert_python_ok("-c", code)
3365
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003366 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003367 def test_create_at_shutdown_without_encoding(self):
3368 rc, out, err = self._check_create_at_shutdown()
3369 if err:
3370 # Can error out with a RuntimeError if the module state
3371 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003372 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003373 else:
3374 self.assertEqual("ok", out.decode().strip())
3375
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003376 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003377 def test_create_at_shutdown_with_encoding(self):
3378 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3379 errors='strict')
3380 self.assertFalse(err)
3381 self.assertEqual("ok", out.decode().strip())
3382
Antoine Pitroub8503892014-04-29 10:14:02 +02003383 def test_read_byteslike(self):
3384 r = MemviewBytesIO(b'Just some random string\n')
3385 t = self.TextIOWrapper(r, 'utf-8')
3386
3387 # TextIOwrapper will not read the full string, because
3388 # we truncate it to a multiple of the native int size
3389 # so that we can construct a more complex memoryview.
3390 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3391
3392 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3393
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003394 def test_issue22849(self):
3395 class F(object):
3396 def readable(self): return True
3397 def writable(self): return True
3398 def seekable(self): return True
3399
3400 for i in range(10):
3401 try:
3402 self.TextIOWrapper(F(), encoding='utf-8')
3403 except Exception:
3404 pass
3405
3406 F.tell = lambda x: 0
3407 t = self.TextIOWrapper(F(), encoding='utf-8')
3408
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003409
Antoine Pitroub8503892014-04-29 10:14:02 +02003410class MemviewBytesIO(io.BytesIO):
3411 '''A BytesIO object whose read method returns memoryviews
3412 rather than bytes'''
3413
3414 def read1(self, len_):
3415 return _to_memoryview(super().read1(len_))
3416
3417 def read(self, len_):
3418 return _to_memoryview(super().read(len_))
3419
3420def _to_memoryview(buf):
3421 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3422
3423 arr = array.array('i')
3424 idx = len(buf) - len(buf) % arr.itemsize
3425 arr.frombytes(buf[:idx])
3426 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003427
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003428
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003429class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003430 io = io
Steve Dowerfe8f4c92016-09-07 09:31:52 -07003431 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003432
3433 def test_initialization(self):
3434 r = self.BytesIO(b"\xc3\xa9\n\n")
3435 b = self.BufferedReader(r, 1000)
3436 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003437 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3438 self.assertRaises(ValueError, t.read)
3439
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003440 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3441 self.assertRaises(Exception, repr, t)
3442
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003443 def test_garbage_collection(self):
3444 # C TextIOWrapper objects are collected, and collecting them flushes
3445 # all data to disk.
3446 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003447 with support.check_warnings(('', ResourceWarning)):
3448 rawio = io.FileIO(support.TESTFN, "wb")
3449 b = self.BufferedWriter(rawio)
3450 t = self.TextIOWrapper(b, encoding="ascii")
3451 t.write("456def")
3452 t.x = t
3453 wr = weakref.ref(t)
3454 del t
3455 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003456 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003457 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003458 self.assertEqual(f.read(), b"456def")
3459
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003460 def test_rwpair_cleared_before_textio(self):
3461 # Issue 13070: TextIOWrapper's finalization would crash when called
3462 # after the reference to the underlying BufferedRWPair's writer got
3463 # cleared by the GC.
3464 for i in range(1000):
3465 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3466 t1 = self.TextIOWrapper(b1, encoding="ascii")
3467 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3468 t2 = self.TextIOWrapper(b2, encoding="ascii")
3469 # circular references
3470 t1.buddy = t2
3471 t2.buddy = t1
3472 support.gc_collect()
3473
3474
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003475class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003476 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003477 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003478
3479
3480class IncrementalNewlineDecoderTest(unittest.TestCase):
3481
3482 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003483 # UTF-8 specific tests for a newline decoder
3484 def _check_decode(b, s, **kwargs):
3485 # We exercise getstate() / setstate() as well as decode()
3486 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003487 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003488 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003489 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003490
Antoine Pitrou180a3362008-12-14 16:36:46 +00003491 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003492
Antoine Pitrou180a3362008-12-14 16:36:46 +00003493 _check_decode(b'\xe8', "")
3494 _check_decode(b'\xa2', "")
3495 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003496
Antoine Pitrou180a3362008-12-14 16:36:46 +00003497 _check_decode(b'\xe8', "")
3498 _check_decode(b'\xa2', "")
3499 _check_decode(b'\x88', "\u8888")
3500
3501 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003502 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3503
Antoine Pitrou180a3362008-12-14 16:36:46 +00003504 decoder.reset()
3505 _check_decode(b'\n', "\n")
3506 _check_decode(b'\r', "")
3507 _check_decode(b'', "\n", final=True)
3508 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003509
Antoine Pitrou180a3362008-12-14 16:36:46 +00003510 _check_decode(b'\r', "")
3511 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003512
Antoine Pitrou180a3362008-12-14 16:36:46 +00003513 _check_decode(b'\r\r\n', "\n\n")
3514 _check_decode(b'\r', "")
3515 _check_decode(b'\r', "\n")
3516 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003517
Antoine Pitrou180a3362008-12-14 16:36:46 +00003518 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3519 _check_decode(b'\xe8\xa2\x88', "\u8888")
3520 _check_decode(b'\n', "\n")
3521 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3522 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003523
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003524 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003525 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003526 if encoding is not None:
3527 encoder = codecs.getincrementalencoder(encoding)()
3528 def _decode_bytewise(s):
3529 # Decode one byte at a time
3530 for b in encoder.encode(s):
3531 result.append(decoder.decode(bytes([b])))
3532 else:
3533 encoder = None
3534 def _decode_bytewise(s):
3535 # Decode one char at a time
3536 for c in s:
3537 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003538 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003539 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003540 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003541 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003542 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003543 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003544 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003545 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003546 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003547 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003548 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003549 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003550 input = "abc"
3551 if encoder is not None:
3552 encoder.reset()
3553 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003554 self.assertEqual(decoder.decode(input), "abc")
3555 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003556
3557 def test_newline_decoder(self):
3558 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003559 # None meaning the IncrementalNewlineDecoder takes unicode input
3560 # rather than bytes input
3561 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003562 'utf-16', 'utf-16-le', 'utf-16-be',
3563 'utf-32', 'utf-32-le', 'utf-32-be',
3564 )
3565 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003566 decoder = enc and codecs.getincrementaldecoder(enc)()
3567 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3568 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003569 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003570 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3571 self.check_newline_decoding_utf8(decoder)
Oren Milman1d1d3e92017-08-20 18:35:36 +03003572 self.assertRaises(TypeError, decoder.setstate, 42)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003573
Antoine Pitrou66913e22009-03-06 23:40:56 +00003574 def test_newline_bytes(self):
3575 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3576 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003577 self.assertEqual(dec.newlines, None)
3578 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3579 self.assertEqual(dec.newlines, None)
3580 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3581 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003582 dec = self.IncrementalNewlineDecoder(None, translate=False)
3583 _check(dec)
3584 dec = self.IncrementalNewlineDecoder(None, translate=True)
3585 _check(dec)
3586
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003587class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3588 pass
3589
3590class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3591 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003592
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003593
Guido van Rossum01a27522007-03-07 01:00:12 +00003594# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003595
Guido van Rossum5abbf752007-08-27 17:39:33 +00003596class MiscIOTest(unittest.TestCase):
3597
Barry Warsaw40e82462008-11-20 20:14:50 +00003598 def tearDown(self):
3599 support.unlink(support.TESTFN)
3600
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003601 def test___all__(self):
3602 for name in self.io.__all__:
3603 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003604 self.assertIsNotNone(obj, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003605 if name == "open":
3606 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003607 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003608 self.assertTrue(issubclass(obj, Exception), name)
3609 elif not name.startswith("SEEK_"):
3610 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003611
Barry Warsaw40e82462008-11-20 20:14:50 +00003612 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003613 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003614 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003615 f.close()
3616
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003617 with support.check_warnings(('', DeprecationWarning)):
3618 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003619 self.assertEqual(f.name, support.TESTFN)
3620 self.assertEqual(f.buffer.name, support.TESTFN)
3621 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3622 self.assertEqual(f.mode, "U")
3623 self.assertEqual(f.buffer.mode, "rb")
3624 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003625 f.close()
3626
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003627 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003628 self.assertEqual(f.mode, "w+")
3629 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3630 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003631
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003632 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003633 self.assertEqual(g.mode, "wb")
3634 self.assertEqual(g.raw.mode, "wb")
3635 self.assertEqual(g.name, f.fileno())
3636 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003637 f.close()
3638 g.close()
3639
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003640 def test_io_after_close(self):
3641 for kwargs in [
3642 {"mode": "w"},
3643 {"mode": "wb"},
3644 {"mode": "w", "buffering": 1},
3645 {"mode": "w", "buffering": 2},
3646 {"mode": "wb", "buffering": 0},
3647 {"mode": "r"},
3648 {"mode": "rb"},
3649 {"mode": "r", "buffering": 1},
3650 {"mode": "r", "buffering": 2},
3651 {"mode": "rb", "buffering": 0},
3652 {"mode": "w+"},
3653 {"mode": "w+b"},
3654 {"mode": "w+", "buffering": 1},
3655 {"mode": "w+", "buffering": 2},
3656 {"mode": "w+b", "buffering": 0},
3657 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003658 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003659 f.close()
3660 self.assertRaises(ValueError, f.flush)
3661 self.assertRaises(ValueError, f.fileno)
3662 self.assertRaises(ValueError, f.isatty)
3663 self.assertRaises(ValueError, f.__iter__)
3664 if hasattr(f, "peek"):
3665 self.assertRaises(ValueError, f.peek, 1)
3666 self.assertRaises(ValueError, f.read)
3667 if hasattr(f, "read1"):
3668 self.assertRaises(ValueError, f.read1, 1024)
Martin Panterccb2c0e2016-10-20 23:48:14 +00003669 self.assertRaises(ValueError, f.read1)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003670 if hasattr(f, "readall"):
3671 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003672 if hasattr(f, "readinto"):
3673 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003674 if hasattr(f, "readinto1"):
3675 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003676 self.assertRaises(ValueError, f.readline)
3677 self.assertRaises(ValueError, f.readlines)
Xiang Zhang026435c2017-04-15 12:47:28 +08003678 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003679 self.assertRaises(ValueError, f.seek, 0)
3680 self.assertRaises(ValueError, f.tell)
3681 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003682 self.assertRaises(ValueError, f.write,
3683 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003684 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003685 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003686
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003687 def test_blockingioerror(self):
3688 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003689 class C(str):
3690 pass
3691 c = C("")
3692 b = self.BlockingIOError(1, c)
3693 c.b = b
3694 b.c = c
3695 wr = weakref.ref(c)
3696 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003697 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003698 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003699
3700 def test_abcs(self):
3701 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003702 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3703 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3704 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3705 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003706
3707 def _check_abc_inheritance(self, abcmodule):
3708 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003709 self.assertIsInstance(f, abcmodule.IOBase)
3710 self.assertIsInstance(f, abcmodule.RawIOBase)
3711 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3712 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003713 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003714 self.assertIsInstance(f, abcmodule.IOBase)
3715 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3716 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3717 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003718 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003719 self.assertIsInstance(f, abcmodule.IOBase)
3720 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3721 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3722 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003723
3724 def test_abc_inheritance(self):
3725 # Test implementations inherit from their respective ABCs
3726 self._check_abc_inheritance(self)
3727
3728 def test_abc_inheritance_official(self):
3729 # Test implementations inherit from the official ABCs of the
3730 # baseline "io" module.
3731 self._check_abc_inheritance(io)
3732
Antoine Pitroue033e062010-10-29 10:38:18 +00003733 def _check_warn_on_dealloc(self, *args, **kwargs):
3734 f = open(*args, **kwargs)
3735 r = repr(f)
3736 with self.assertWarns(ResourceWarning) as cm:
3737 f = None
3738 support.gc_collect()
3739 self.assertIn(r, str(cm.warning.args[0]))
3740
3741 def test_warn_on_dealloc(self):
3742 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3743 self._check_warn_on_dealloc(support.TESTFN, "wb")
3744 self._check_warn_on_dealloc(support.TESTFN, "w")
3745
3746 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3747 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003748 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003749 for fd in fds:
3750 try:
3751 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003752 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003753 if e.errno != errno.EBADF:
3754 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003755 self.addCleanup(cleanup_fds)
3756 r, w = os.pipe()
3757 fds += r, w
3758 self._check_warn_on_dealloc(r, *args, **kwargs)
3759 # When using closefd=False, there's no warning
3760 r, w = os.pipe()
3761 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02003762 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00003763 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00003764
3765 def test_warn_on_dealloc_fd(self):
3766 self._check_warn_on_dealloc_fd("rb", buffering=0)
3767 self._check_warn_on_dealloc_fd("rb")
3768 self._check_warn_on_dealloc_fd("r")
3769
3770
Antoine Pitrou243757e2010-11-05 21:15:39 +00003771 def test_pickling(self):
3772 # Pickling file objects is forbidden
3773 for kwargs in [
3774 {"mode": "w"},
3775 {"mode": "wb"},
3776 {"mode": "wb", "buffering": 0},
3777 {"mode": "r"},
3778 {"mode": "rb"},
3779 {"mode": "rb", "buffering": 0},
3780 {"mode": "w+"},
3781 {"mode": "w+b"},
3782 {"mode": "w+b", "buffering": 0},
3783 ]:
3784 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3785 with self.open(support.TESTFN, **kwargs) as f:
3786 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3787
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003788 def test_nonblock_pipe_write_bigbuf(self):
3789 self._test_nonblock_pipe_write(16*1024)
3790
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003791 def test_nonblock_pipe_write_smallbuf(self):
3792 self._test_nonblock_pipe_write(1024)
3793
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003794 @unittest.skipUnless(hasattr(os, 'set_blocking'),
3795 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003796 def _test_nonblock_pipe_write(self, bufsize):
3797 sent = []
3798 received = []
3799 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003800 os.set_blocking(r, False)
3801 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003802
3803 # To exercise all code paths in the C implementation we need
3804 # to play with buffer sizes. For instance, if we choose a
3805 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3806 # then we will never get a partial write of the buffer.
3807 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3808 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3809
3810 with rf, wf:
3811 for N in 9999, 73, 7574:
3812 try:
3813 i = 0
3814 while True:
3815 msg = bytes([i % 26 + 97]) * N
3816 sent.append(msg)
3817 wf.write(msg)
3818 i += 1
3819
3820 except self.BlockingIOError as e:
3821 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003822 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003823 sent[-1] = sent[-1][:e.characters_written]
3824 received.append(rf.read())
3825 msg = b'BLOCKED'
3826 wf.write(msg)
3827 sent.append(msg)
3828
3829 while True:
3830 try:
3831 wf.flush()
3832 break
3833 except self.BlockingIOError as e:
3834 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003835 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003836 self.assertEqual(e.characters_written, 0)
3837 received.append(rf.read())
3838
3839 received += iter(rf.read, None)
3840
3841 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003842 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003843 self.assertTrue(wf.closed)
3844 self.assertTrue(rf.closed)
3845
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003846 def test_create_fail(self):
3847 # 'x' mode fails if file is existing
3848 with self.open(support.TESTFN, 'w'):
3849 pass
3850 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3851
3852 def test_create_writes(self):
3853 # 'x' mode opens for writing
3854 with self.open(support.TESTFN, 'xb') as f:
3855 f.write(b"spam")
3856 with self.open(support.TESTFN, 'rb') as f:
3857 self.assertEqual(b"spam", f.read())
3858
Christian Heimes7b648752012-09-10 14:48:43 +02003859 def test_open_allargs(self):
3860 # there used to be a buffer overflow in the parser for rawmode
3861 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3862
3863
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003864class CMiscIOTest(MiscIOTest):
3865 io = io
3866
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003867 def test_readinto_buffer_overflow(self):
3868 # Issue #18025
3869 class BadReader(self.io.BufferedIOBase):
3870 def read(self, n=-1):
3871 return b'x' * 10**6
3872 bufio = BadReader()
3873 b = bytearray(2)
3874 self.assertRaises(ValueError, bufio.readinto, b)
3875
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003876 def check_daemon_threads_shutdown_deadlock(self, stream_name):
3877 # Issue #23309: deadlocks at shutdown should be avoided when a
3878 # daemon thread and the main thread both write to a file.
3879 code = """if 1:
3880 import sys
3881 import time
3882 import threading
Victor Stinner2a1aed02017-04-21 17:59:23 +02003883 from test.support import SuppressCrashReport
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003884
3885 file = sys.{stream_name}
3886
3887 def run():
3888 while True:
3889 file.write('.')
3890 file.flush()
3891
Victor Stinner2a1aed02017-04-21 17:59:23 +02003892 crash = SuppressCrashReport()
3893 crash.__enter__()
3894 # don't call __exit__(): the crash occurs at Python shutdown
3895
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003896 thread = threading.Thread(target=run)
3897 thread.daemon = True
3898 thread.start()
3899
3900 time.sleep(0.5)
3901 file.write('!')
3902 file.flush()
3903 """.format_map(locals())
3904 res, _ = run_python_until_end("-c", code)
3905 err = res.err.decode()
3906 if res.rc != 0:
3907 # Failure: should be a fatal error
3908 self.assertIn("Fatal Python error: could not acquire lock "
3909 "for <_io.BufferedWriter name='<{stream_name}>'> "
3910 "at interpreter shutdown, possibly due to "
3911 "daemon threads".format_map(locals()),
3912 err)
3913 else:
3914 self.assertFalse(err.strip('.!'))
3915
3916 def test_daemon_threads_shutdown_stdout_deadlock(self):
3917 self.check_daemon_threads_shutdown_deadlock('stdout')
3918
3919 def test_daemon_threads_shutdown_stderr_deadlock(self):
3920 self.check_daemon_threads_shutdown_deadlock('stderr')
3921
3922
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003923class PyMiscIOTest(MiscIOTest):
3924 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003925
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003926
3927@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3928class SignalsTest(unittest.TestCase):
3929
3930 def setUp(self):
3931 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3932
3933 def tearDown(self):
3934 signal.signal(signal.SIGALRM, self.oldalrm)
3935
3936 def alarm_interrupt(self, sig, frame):
3937 1/0
3938
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003939 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3940 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003941 invokes the signal handler, and bubbles up the exception raised
3942 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003943 read_results = []
3944 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003945 if hasattr(signal, 'pthread_sigmask'):
3946 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003947 s = os.read(r, 1)
3948 read_results.append(s)
3949 t = threading.Thread(target=_read)
3950 t.daemon = True
3951 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003952 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01003953 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003954 try:
3955 wio = self.io.open(w, **fdopen_kwargs)
3956 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003957 # Fill the pipe enough that the write will be blocking.
3958 # It will be interrupted by the timer armed above. Since the
3959 # other thread has read one byte, the low-level write will
3960 # return with a successful (partial) result rather than an EINTR.
3961 # The buffered IO layer must check for pending signal
3962 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003963 signal.alarm(1)
3964 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01003965 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02003966 finally:
3967 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003968 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003969 # We got one byte, get another one and check that it isn't a
3970 # repeat of the first one.
3971 read_results.append(os.read(r, 1))
3972 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3973 finally:
3974 os.close(w)
3975 os.close(r)
3976 # This is deliberate. If we didn't close the file descriptor
3977 # before closing wio, wio would try to flush its internal
3978 # buffer, and block again.
3979 try:
3980 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003981 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003982 if e.errno != errno.EBADF:
3983 raise
3984
3985 def test_interrupted_write_unbuffered(self):
3986 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3987
3988 def test_interrupted_write_buffered(self):
3989 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3990
Victor Stinner6ab72862014-09-03 23:32:28 +02003991 # Issue #22331: The test hangs on FreeBSD 7.2
3992 @support.requires_freebsd_version(8)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003993 def test_interrupted_write_text(self):
3994 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3995
Brett Cannon31f59292011-02-21 19:29:56 +00003996 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003997 def check_reentrant_write(self, data, **fdopen_kwargs):
3998 def on_alarm(*args):
3999 # Will be called reentrantly from the same thread
4000 wio.write(data)
4001 1/0
4002 signal.signal(signal.SIGALRM, on_alarm)
4003 r, w = os.pipe()
4004 wio = self.io.open(w, **fdopen_kwargs)
4005 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004006 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004007 # Either the reentrant call to wio.write() fails with RuntimeError,
4008 # or the signal handler raises ZeroDivisionError.
4009 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4010 while 1:
4011 for i in range(100):
4012 wio.write(data)
4013 wio.flush()
4014 # Make sure the buffer doesn't fill up and block further writes
4015 os.read(r, len(data) * 100)
4016 exc = cm.exception
4017 if isinstance(exc, RuntimeError):
4018 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4019 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004020 signal.alarm(0)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004021 wio.close()
4022 os.close(r)
4023
4024 def test_reentrant_write_buffered(self):
4025 self.check_reentrant_write(b"xy", mode="wb")
4026
4027 def test_reentrant_write_text(self):
4028 self.check_reentrant_write("xy", mode="w", encoding="ascii")
4029
Antoine Pitrou707ce822011-02-25 21:24:11 +00004030 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4031 """Check that a buffered read, when it gets interrupted (either
4032 returning a partial result or EINTR), properly invokes the signal
4033 handler and retries if the latter returned successfully."""
4034 r, w = os.pipe()
4035 fdopen_kwargs["closefd"] = False
4036 def alarm_handler(sig, frame):
4037 os.write(w, b"bar")
4038 signal.signal(signal.SIGALRM, alarm_handler)
4039 try:
4040 rio = self.io.open(r, **fdopen_kwargs)
4041 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004042 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004043 # Expected behaviour:
4044 # - first raw read() returns partial b"foo"
4045 # - second raw read() returns EINTR
4046 # - third raw read() returns b"bar"
4047 self.assertEqual(decode(rio.read(6)), "foobar")
4048 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004049 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004050 rio.close()
4051 os.close(w)
4052 os.close(r)
4053
Antoine Pitrou20db5112011-08-19 20:32:34 +02004054 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004055 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4056 mode="rb")
4057
Antoine Pitrou20db5112011-08-19 20:32:34 +02004058 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004059 self.check_interrupted_read_retry(lambda x: x,
4060 mode="r")
4061
Antoine Pitrou707ce822011-02-25 21:24:11 +00004062 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4063 """Check that a buffered write, when it gets interrupted (either
4064 returning a partial result or EINTR), properly invokes the signal
4065 handler and retries if the latter returned successfully."""
4066 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004067
Antoine Pitrou707ce822011-02-25 21:24:11 +00004068 # A quantity that exceeds the buffer size of an anonymous pipe's
4069 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02004070 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00004071 r, w = os.pipe()
4072 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004073
Antoine Pitrou707ce822011-02-25 21:24:11 +00004074 # We need a separate thread to read from the pipe and allow the
4075 # write() to finish. This thread is started after the SIGALRM is
4076 # received (forcing a first EINTR in write()).
4077 read_results = []
4078 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004079 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00004080 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004081 try:
4082 while not write_finished:
4083 while r in select.select([r], [], [], 1.0)[0]:
4084 s = os.read(r, 1024)
4085 read_results.append(s)
4086 except BaseException as exc:
4087 nonlocal error
4088 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00004089 t = threading.Thread(target=_read)
4090 t.daemon = True
4091 def alarm1(sig, frame):
4092 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004093 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004094 def alarm2(sig, frame):
4095 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004096
4097 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00004098 signal.signal(signal.SIGALRM, alarm1)
4099 try:
4100 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004101 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004102 # Expected behaviour:
4103 # - first raw write() is partial (because of the limited pipe buffer
4104 # and the first alarm)
4105 # - second raw write() returns EINTR (because of the second alarm)
4106 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004107 written = wio.write(large_data)
4108 self.assertEqual(N, written)
4109
Antoine Pitrou707ce822011-02-25 21:24:11 +00004110 wio.flush()
4111 write_finished = True
4112 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004113
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004114 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004115 self.assertEqual(N, sum(len(x) for x in read_results))
4116 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004117 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004118 write_finished = True
4119 os.close(w)
4120 os.close(r)
4121 # This is deliberate. If we didn't close the file descriptor
4122 # before closing wio, wio would try to flush its internal
4123 # buffer, and could block (in case of failure).
4124 try:
4125 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004126 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00004127 if e.errno != errno.EBADF:
4128 raise
4129
Antoine Pitrou20db5112011-08-19 20:32:34 +02004130 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004131 self.check_interrupted_write_retry(b"x", mode="wb")
4132
Antoine Pitrou20db5112011-08-19 20:32:34 +02004133 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004134 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4135
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004136
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004137class CSignalsTest(SignalsTest):
4138 io = io
4139
4140class PySignalsTest(SignalsTest):
4141 io = pyio
4142
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004143 # Handling reentrancy issues would slow down _pyio even more, so the
4144 # tests are disabled.
4145 test_reentrant_write_buffered = None
4146 test_reentrant_write_text = None
4147
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004148
Ezio Melottidaa42c72013-03-23 16:30:16 +02004149def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07004150 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004151 CBufferedReaderTest, PyBufferedReaderTest,
4152 CBufferedWriterTest, PyBufferedWriterTest,
4153 CBufferedRWPairTest, PyBufferedRWPairTest,
4154 CBufferedRandomTest, PyBufferedRandomTest,
4155 StatefulIncrementalDecoderTest,
4156 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4157 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004158 CMiscIOTest, PyMiscIOTest,
4159 CSignalsTest, PySignalsTest,
4160 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004161
4162 # Put the namespaces of the IO module we are testing and some useful mock
4163 # classes in the __dict__ of each test.
4164 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
benfogle9703f092017-11-10 16:03:40 -05004165 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4166 SlowFlushRawIO)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004167 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4168 c_io_ns = {name : getattr(io, name) for name in all_members}
4169 py_io_ns = {name : getattr(pyio, name) for name in all_members}
4170 globs = globals()
4171 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4172 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4173 # Avoid turning open into a bound method.
4174 py_io_ns["open"] = pyio.OpenWrapper
4175 for test in tests:
4176 if test.__name__.startswith("C"):
4177 for name, obj in c_io_ns.items():
4178 setattr(test, name, obj)
4179 elif test.__name__.startswith("Py"):
4180 for name, obj in py_io_ns.items():
4181 setattr(test, name, obj)
4182
Ezio Melottidaa42c72013-03-23 16:30:16 +02004183 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4184 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004185
4186if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004187 unittest.main()