blob: 66748317b5f214e1df5adc25e8b52293c834bf66 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020031import threading
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000032import time
Guido van Rossum28524c72007-02-27 05:47:44 +000033import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000034import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020035import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020036from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020037from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000038from test import support
Berker Peksagce643912015-05-06 06:33:17 +030039from test.support.script_helper import assert_python_ok, run_python_until_end
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000040
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000041import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000042import io # C implementation of io
43import _pyio as pyio # Python implementation of io
Guido van Rossuma9e20242007-03-08 00:43:48 +000044
Martin Panter6bb91f32016-05-28 00:41:57 +000045try:
46 import ctypes
47except ImportError:
48 def byteslike(*pos, **kw):
49 return array.array("b", bytes(*pos, **kw))
50else:
51 def byteslike(*pos, **kw):
52 """Create a bytes-like object having no string or sequence methods"""
53 data = bytes(*pos, **kw)
54 obj = EmptyStruct()
55 ctypes.resize(obj, len(data))
56 memoryview(obj).cast("B")[:] = data
57 return obj
58 class EmptyStruct(ctypes.Structure):
59 pass
60
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000061def _default_chunk_size():
62 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000063 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064 return f._CHUNK_SIZE
65
66
Antoine Pitrou328ec742010-09-14 18:37:24 +000067class MockRawIOWithoutRead:
68 """A RawIO implementation without read(), so as to exercise the default
69 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000070
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000071 def __init__(self, read_stack=()):
72 self._read_stack = list(read_stack)
73 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000074 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000075 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000076
Guido van Rossum01a27522007-03-07 01:00:12 +000077 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000078 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000079 return len(b)
80
81 def writable(self):
82 return True
83
Guido van Rossum68bbcd22007-02-27 17:19:33 +000084 def fileno(self):
85 return 42
86
87 def readable(self):
88 return True
89
Guido van Rossum01a27522007-03-07 01:00:12 +000090 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000091 return True
92
Guido van Rossum01a27522007-03-07 01:00:12 +000093 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000094 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000095
96 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0 # same comment as above
98
99 def readinto(self, buf):
100 self._reads += 1
101 max_len = len(buf)
102 try:
103 data = self._read_stack[0]
104 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000105 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000106 return 0
107 if data is None:
108 del self._read_stack[0]
109 return None
110 n = len(data)
111 if len(data) <= max_len:
112 del self._read_stack[0]
113 buf[:n] = data
114 return n
115 else:
116 buf[:] = data[:max_len]
117 self._read_stack[0] = data[max_len:]
118 return max_len
119
120 def truncate(self, pos=None):
121 return pos
122
Antoine Pitrou328ec742010-09-14 18:37:24 +0000123class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
124 pass
125
126class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
127 pass
128
129
130class MockRawIO(MockRawIOWithoutRead):
131
132 def read(self, n=None):
133 self._reads += 1
134 try:
135 return self._read_stack.pop(0)
136 except:
137 self._extraneous_reads += 1
138 return b""
139
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000140class CMockRawIO(MockRawIO, io.RawIOBase):
141 pass
142
143class PyMockRawIO(MockRawIO, pyio.RawIOBase):
144 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000145
Guido van Rossuma9e20242007-03-08 00:43:48 +0000146
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000147class MisbehavedRawIO(MockRawIO):
148 def write(self, b):
149 return super().write(b) * 2
150
151 def read(self, n=None):
152 return super().read(n) * 2
153
154 def seek(self, pos, whence):
155 return -123
156
157 def tell(self):
158 return -456
159
160 def readinto(self, buf):
161 super().readinto(buf)
162 return len(buf) * 5
163
164class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
165 pass
166
167class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
168 pass
169
170
benfogle9703f092017-11-10 16:03:40 -0500171class SlowFlushRawIO(MockRawIO):
172 def __init__(self):
173 super().__init__()
174 self.in_flush = threading.Event()
175
176 def flush(self):
177 self.in_flush.set()
178 time.sleep(0.25)
179
180class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
181 pass
182
183class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
184 pass
185
186
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000187class CloseFailureIO(MockRawIO):
188 closed = 0
189
190 def close(self):
191 if not self.closed:
192 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200193 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000194
195class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
196 pass
197
198class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
199 pass
200
201
202class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000203
204 def __init__(self, data):
205 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000206 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000207
208 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000209 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000210 self.read_history.append(None if res is None else len(res))
211 return res
212
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000213 def readinto(self, b):
214 res = super().readinto(b)
215 self.read_history.append(res)
216 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000217
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000218class CMockFileIO(MockFileIO, io.BytesIO):
219 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000220
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000221class PyMockFileIO(MockFileIO, pyio.BytesIO):
222 pass
223
224
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000225class MockUnseekableIO:
226 def seekable(self):
227 return False
228
229 def seek(self, *args):
230 raise self.UnsupportedOperation("not seekable")
231
232 def tell(self, *args):
233 raise self.UnsupportedOperation("not seekable")
234
Martin Panter754aab22016-03-31 07:21:56 +0000235 def truncate(self, *args):
236 raise self.UnsupportedOperation("not seekable")
237
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000238class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
239 UnsupportedOperation = io.UnsupportedOperation
240
241class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
242 UnsupportedOperation = pyio.UnsupportedOperation
243
244
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000245class MockNonBlockWriterIO:
246
247 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000248 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000249 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000250
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000251 def pop_written(self):
252 s = b"".join(self._write_stack)
253 self._write_stack[:] = []
254 return s
255
256 def block_on(self, char):
257 """Block when a given char is encountered."""
258 self._blocker_char = char
259
260 def readable(self):
261 return True
262
263 def seekable(self):
264 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000265
Guido van Rossum01a27522007-03-07 01:00:12 +0000266 def writable(self):
267 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000268
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000269 def write(self, b):
270 b = bytes(b)
271 n = -1
272 if self._blocker_char:
273 try:
274 n = b.index(self._blocker_char)
275 except ValueError:
276 pass
277 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100278 if n > 0:
279 # write data up to the first blocker
280 self._write_stack.append(b[:n])
281 return n
282 else:
283 # cancel blocker and indicate would block
284 self._blocker_char = None
285 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000286 self._write_stack.append(b)
287 return len(b)
288
289class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
290 BlockingIOError = io.BlockingIOError
291
292class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
293 BlockingIOError = pyio.BlockingIOError
294
Guido van Rossuma9e20242007-03-08 00:43:48 +0000295
Guido van Rossum28524c72007-02-27 05:47:44 +0000296class IOTest(unittest.TestCase):
297
Neal Norwitze7789b12008-03-24 06:18:09 +0000298 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000299 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000300
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000301 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000302 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000303
Guido van Rossum28524c72007-02-27 05:47:44 +0000304 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000305 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000306 f.truncate(0)
307 self.assertEqual(f.tell(), 5)
308 f.seek(0)
309
310 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000311 self.assertEqual(f.seek(0), 0)
312 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000313 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000314 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000315 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000316 buffer = bytearray(b" world\n\n\n")
317 self.assertEqual(f.write(buffer), 9)
318 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000319 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000320 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000321 self.assertEqual(f.seek(-1, 2), 13)
322 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000323
Guido van Rossum87429772007-04-10 21:06:59 +0000324 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000325 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000326 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000327
Guido van Rossum9b76da62007-04-11 01:09:03 +0000328 def read_ops(self, f, buffered=False):
329 data = f.read(5)
330 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000331 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000332 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000333 self.assertEqual(bytes(data), b" worl")
334 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000335 self.assertEqual(f.readinto(data), 2)
336 self.assertEqual(len(data), 5)
337 self.assertEqual(data[:2], b"d\n")
338 self.assertEqual(f.seek(0), 0)
339 self.assertEqual(f.read(20), b"hello world\n")
340 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000341 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000342 self.assertEqual(f.seek(-6, 2), 6)
343 self.assertEqual(f.read(5), b"world")
344 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000345 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000346 self.assertEqual(f.seek(-6, 1), 5)
347 self.assertEqual(f.read(5), b" worl")
348 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000349 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000350 if buffered:
351 f.seek(0)
352 self.assertEqual(f.read(), b"hello world\n")
353 f.seek(6)
354 self.assertEqual(f.read(), b"world\n")
355 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000356 f.seek(0)
357 data = byteslike(5)
358 self.assertEqual(f.readinto1(data), 5)
359 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000360
Guido van Rossum34d69e52007-04-10 20:08:41 +0000361 LARGE = 2**31
362
Guido van Rossum53807da2007-04-10 19:01:47 +0000363 def large_file_ops(self, f):
364 assert f.readable()
365 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100366 try:
367 self.assertEqual(f.seek(self.LARGE), self.LARGE)
368 except (OverflowError, ValueError):
369 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000370 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000371 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000372 self.assertEqual(f.tell(), self.LARGE + 3)
373 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000374 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000375 self.assertEqual(f.tell(), self.LARGE + 2)
376 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000377 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000378 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000379 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
380 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000381 self.assertEqual(f.read(2), b"x")
382
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000383 def test_invalid_operations(self):
384 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000385 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000386 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000387 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000388 self.assertRaises(exc, fp.read)
389 self.assertRaises(exc, fp.readline)
390 with self.open(support.TESTFN, "wb", buffering=0) as fp:
391 self.assertRaises(exc, fp.read)
392 self.assertRaises(exc, fp.readline)
393 with self.open(support.TESTFN, "rb", buffering=0) as fp:
394 self.assertRaises(exc, fp.write, b"blah")
395 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000396 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000397 self.assertRaises(exc, fp.write, b"blah")
398 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000399 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000400 self.assertRaises(exc, fp.write, "blah")
401 self.assertRaises(exc, fp.writelines, ["blah\n"])
402 # Non-zero seeking from current or end pos
403 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
404 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000405
Martin Panter754aab22016-03-31 07:21:56 +0000406 def test_optional_abilities(self):
407 # Test for OSError when optional APIs are not supported
408 # The purpose of this test is to try fileno(), reading, writing and
409 # seeking operations with various objects that indicate they do not
410 # support these operations.
411
412 def pipe_reader():
413 [r, w] = os.pipe()
414 os.close(w) # So that read() is harmless
415 return self.FileIO(r, "r")
416
417 def pipe_writer():
418 [r, w] = os.pipe()
419 self.addCleanup(os.close, r)
420 # Guarantee that we can write into the pipe without blocking
421 thread = threading.Thread(target=os.read, args=(r, 100))
422 thread.start()
423 self.addCleanup(thread.join)
424 return self.FileIO(w, "w")
425
426 def buffered_reader():
427 return self.BufferedReader(self.MockUnseekableIO())
428
429 def buffered_writer():
430 return self.BufferedWriter(self.MockUnseekableIO())
431
432 def buffered_random():
433 return self.BufferedRandom(self.BytesIO())
434
435 def buffered_rw_pair():
436 return self.BufferedRWPair(self.MockUnseekableIO(),
437 self.MockUnseekableIO())
438
439 def text_reader():
440 class UnseekableReader(self.MockUnseekableIO):
441 writable = self.BufferedIOBase.writable
442 write = self.BufferedIOBase.write
443 return self.TextIOWrapper(UnseekableReader(), "ascii")
444
445 def text_writer():
446 class UnseekableWriter(self.MockUnseekableIO):
447 readable = self.BufferedIOBase.readable
448 read = self.BufferedIOBase.read
449 return self.TextIOWrapper(UnseekableWriter(), "ascii")
450
451 tests = (
452 (pipe_reader, "fr"), (pipe_writer, "fw"),
453 (buffered_reader, "r"), (buffered_writer, "w"),
454 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
455 (text_reader, "r"), (text_writer, "w"),
456 (self.BytesIO, "rws"), (self.StringIO, "rws"),
457 )
458 for [test, abilities] in tests:
Martin Panter754aab22016-03-31 07:21:56 +0000459 with self.subTest(test), test() as obj:
460 readable = "r" in abilities
461 self.assertEqual(obj.readable(), readable)
462 writable = "w" in abilities
463 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000464
465 if isinstance(obj, self.TextIOBase):
466 data = "3"
467 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
468 data = b"3"
469 else:
470 self.fail("Unknown base class")
471
472 if "f" in abilities:
473 obj.fileno()
474 else:
475 self.assertRaises(OSError, obj.fileno)
476
477 if readable:
478 obj.read(1)
479 obj.read()
480 else:
481 self.assertRaises(OSError, obj.read, 1)
482 self.assertRaises(OSError, obj.read)
483
484 if writable:
485 obj.write(data)
486 else:
487 self.assertRaises(OSError, obj.write, data)
488
Martin Panter3ee147f2016-03-31 21:05:31 +0000489 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000490 pipe_reader, pipe_writer):
491 # Pipes seem to appear as seekable on Windows
492 continue
493 seekable = "s" in abilities
494 self.assertEqual(obj.seekable(), seekable)
495
Martin Panter754aab22016-03-31 07:21:56 +0000496 if seekable:
497 obj.tell()
498 obj.seek(0)
499 else:
500 self.assertRaises(OSError, obj.tell)
501 self.assertRaises(OSError, obj.seek, 0)
502
503 if writable and seekable:
504 obj.truncate()
505 obj.truncate(0)
506 else:
507 self.assertRaises(OSError, obj.truncate)
508 self.assertRaises(OSError, obj.truncate, 0)
509
Antoine Pitrou13348842012-01-29 18:36:34 +0100510 def test_open_handles_NUL_chars(self):
511 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300512 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100513
514 bytes_fn = bytes(fn_with_NUL, 'ascii')
515 with warnings.catch_warnings():
516 warnings.simplefilter("ignore", DeprecationWarning)
517 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100518
Guido van Rossum28524c72007-02-27 05:47:44 +0000519 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000520 with self.open(support.TESTFN, "wb", buffering=0) as f:
521 self.assertEqual(f.readable(), False)
522 self.assertEqual(f.writable(), True)
523 self.assertEqual(f.seekable(), True)
524 self.write_ops(f)
525 with self.open(support.TESTFN, "rb", buffering=0) as f:
526 self.assertEqual(f.readable(), True)
527 self.assertEqual(f.writable(), False)
528 self.assertEqual(f.seekable(), True)
529 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000530
Guido van Rossum87429772007-04-10 21:06:59 +0000531 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000532 with self.open(support.TESTFN, "wb") as f:
533 self.assertEqual(f.readable(), False)
534 self.assertEqual(f.writable(), True)
535 self.assertEqual(f.seekable(), True)
536 self.write_ops(f)
537 with self.open(support.TESTFN, "rb") as f:
538 self.assertEqual(f.readable(), True)
539 self.assertEqual(f.writable(), False)
540 self.assertEqual(f.seekable(), True)
541 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000542
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000543 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000544 with self.open(support.TESTFN, "wb") as f:
545 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
546 with self.open(support.TESTFN, "rb") as f:
547 self.assertEqual(f.readline(), b"abc\n")
548 self.assertEqual(f.readline(10), b"def\n")
549 self.assertEqual(f.readline(2), b"xy")
550 self.assertEqual(f.readline(4), b"zzy\n")
551 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000552 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000553 self.assertRaises(TypeError, f.readline, 5.3)
554 with self.open(support.TESTFN, "r") as f:
555 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000556
Serhiy Storchakabf623ae2017-04-19 20:03:52 +0300557 def test_readline_nonsizeable(self):
558 # Issue #30061
559 # Crash when readline() returns an object without __len__
560 class R(self.IOBase):
561 def readline(self):
562 return None
563 self.assertRaises((TypeError, StopIteration), next, R())
564
565 def test_next_nonsizeable(self):
566 # Issue #30061
567 # Crash when __next__() returns an object without __len__
568 class R(self.IOBase):
569 def __next__(self):
570 return None
571 self.assertRaises(TypeError, R().readlines, 1)
572
Guido van Rossum28524c72007-02-27 05:47:44 +0000573 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000574 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000575 self.write_ops(f)
576 data = f.getvalue()
577 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000578 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000579 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000580
Guido van Rossum53807da2007-04-10 19:01:47 +0000581 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000582 # On Windows and Mac OSX this test comsumes large resources; It takes
Victor Stinner8c663fd2017-11-08 14:44:44 -0800583 # a long time to build the >2 GiB file and takes >2 GiB of disk space
Guido van Rossum34d69e52007-04-10 20:08:41 +0000584 # therefore the resource must be enabled to run this test.
585 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600586 support.requires(
587 'largefile',
588 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000589 with self.open(support.TESTFN, "w+b", 0) as f:
590 self.large_file_ops(f)
591 with self.open(support.TESTFN, "w+b") as f:
592 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000593
594 def test_with_open(self):
595 for bufsize in (0, 1, 100):
596 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000597 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000598 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000599 self.assertEqual(f.closed, True)
600 f = None
601 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000602 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000603 1/0
604 except ZeroDivisionError:
605 self.assertEqual(f.closed, True)
606 else:
607 self.fail("1/0 didn't raise an exception")
608
Antoine Pitrou08838b62009-01-21 00:55:13 +0000609 # issue 5008
610 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000611 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000612 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000613 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000614 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000615 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000616 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000617 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300618 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000619
Guido van Rossum87429772007-04-10 21:06:59 +0000620 def test_destructor(self):
621 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000622 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000623 def __del__(self):
624 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000625 try:
626 f = super().__del__
627 except AttributeError:
628 pass
629 else:
630 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000631 def close(self):
632 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000633 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000634 def flush(self):
635 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000636 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000637 with support.check_warnings(('', ResourceWarning)):
638 f = MyFileIO(support.TESTFN, "wb")
639 f.write(b"xxx")
640 del f
641 support.gc_collect()
642 self.assertEqual(record, [1, 2, 3])
643 with self.open(support.TESTFN, "rb") as f:
644 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000645
646 def _check_base_destructor(self, base):
647 record = []
648 class MyIO(base):
649 def __init__(self):
650 # This exercises the availability of attributes on object
651 # destruction.
652 # (in the C version, close() is called by the tp_dealloc
653 # function, not by __del__)
654 self.on_del = 1
655 self.on_close = 2
656 self.on_flush = 3
657 def __del__(self):
658 record.append(self.on_del)
659 try:
660 f = super().__del__
661 except AttributeError:
662 pass
663 else:
664 f()
665 def close(self):
666 record.append(self.on_close)
667 super().close()
668 def flush(self):
669 record.append(self.on_flush)
670 super().flush()
671 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000672 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000673 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000674 self.assertEqual(record, [1, 2, 3])
675
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000676 def test_IOBase_destructor(self):
677 self._check_base_destructor(self.IOBase)
678
679 def test_RawIOBase_destructor(self):
680 self._check_base_destructor(self.RawIOBase)
681
682 def test_BufferedIOBase_destructor(self):
683 self._check_base_destructor(self.BufferedIOBase)
684
685 def test_TextIOBase_destructor(self):
686 self._check_base_destructor(self.TextIOBase)
687
Guido van Rossum87429772007-04-10 21:06:59 +0000688 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000689 with self.open(support.TESTFN, "wb") as f:
690 f.write(b"xxx")
691 with self.open(support.TESTFN, "rb") as f:
692 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000693
Guido van Rossumd4103952007-04-12 05:44:49 +0000694 def test_array_writes(self):
695 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000696 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000697 def check(f):
698 with f:
699 self.assertEqual(f.write(a), n)
700 f.writelines((a,))
701 check(self.BytesIO())
702 check(self.FileIO(support.TESTFN, "w"))
703 check(self.BufferedWriter(self.MockRawIO()))
704 check(self.BufferedRandom(self.MockRawIO()))
705 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000706
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000707 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000708 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000709 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000710
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000711 def test_read_closed(self):
712 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000713 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000714 with self.open(support.TESTFN, "r") as f:
715 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000716 self.assertEqual(file.read(), "egg\n")
717 file.seek(0)
718 file.close()
719 self.assertRaises(ValueError, file.read)
720
721 def test_no_closefd_with_filename(self):
722 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000723 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000724
725 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000726 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000727 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000728 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000729 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000730 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000731 self.assertEqual(file.buffer.raw.closefd, False)
732
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000733 def test_garbage_collection(self):
734 # FileIO objects are collected, and collecting them flushes
735 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000736 with support.check_warnings(('', ResourceWarning)):
737 f = self.FileIO(support.TESTFN, "wb")
738 f.write(b"abcxxx")
739 f.f = f
740 wr = weakref.ref(f)
741 del f
742 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300743 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000744 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000745 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000746
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000747 def test_unbounded_file(self):
748 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
749 zero = "/dev/zero"
750 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000751 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000752 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000753 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000754 if support.real_max_memuse < support._2G:
Victor Stinner8c663fd2017-11-08 14:44:44 -0800755 self.skipTest("test requires at least 2 GiB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000756 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000757 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000758 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000759 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000760 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000761 self.assertRaises(OverflowError, f.read)
762
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200763 def check_flush_error_on_close(self, *args, **kwargs):
764 # Test that the file is closed despite failed flush
765 # and that flush() is called before file closed.
766 f = self.open(*args, **kwargs)
767 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000768 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200769 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200770 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000771 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200772 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600773 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200774 self.assertTrue(closed) # flush() called
775 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200776 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200777
778 def test_flush_error_on_close(self):
779 # raw file
780 # Issue #5700: io.FileIO calls flush() after file closed
781 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
782 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
783 self.check_flush_error_on_close(fd, 'wb', buffering=0)
784 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
785 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
786 os.close(fd)
787 # buffered io
788 self.check_flush_error_on_close(support.TESTFN, 'wb')
789 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
790 self.check_flush_error_on_close(fd, 'wb')
791 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
792 self.check_flush_error_on_close(fd, 'wb', closefd=False)
793 os.close(fd)
794 # text io
795 self.check_flush_error_on_close(support.TESTFN, 'w')
796 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
797 self.check_flush_error_on_close(fd, 'w')
798 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
799 self.check_flush_error_on_close(fd, 'w', closefd=False)
800 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000801
802 def test_multi_close(self):
803 f = self.open(support.TESTFN, "wb", buffering=0)
804 f.close()
805 f.close()
806 f.close()
807 self.assertRaises(ValueError, f.flush)
808
Antoine Pitrou328ec742010-09-14 18:37:24 +0000809 def test_RawIOBase_read(self):
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530810 # Exercise the default limited RawIOBase.read(n) implementation (which
811 # calls readinto() internally).
Antoine Pitrou328ec742010-09-14 18:37:24 +0000812 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
813 self.assertEqual(rawio.read(2), b"ab")
814 self.assertEqual(rawio.read(2), b"c")
815 self.assertEqual(rawio.read(2), b"d")
816 self.assertEqual(rawio.read(2), None)
817 self.assertEqual(rawio.read(2), b"ef")
818 self.assertEqual(rawio.read(2), b"g")
819 self.assertEqual(rawio.read(2), None)
820 self.assertEqual(rawio.read(2), b"")
821
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400822 def test_types_have_dict(self):
823 test = (
824 self.IOBase(),
825 self.RawIOBase(),
826 self.TextIOBase(),
827 self.StringIO(),
828 self.BytesIO()
829 )
830 for obj in test:
831 self.assertTrue(hasattr(obj, "__dict__"))
832
Ross Lagerwall59142db2011-10-31 20:34:46 +0200833 def test_opener(self):
834 with self.open(support.TESTFN, "w") as f:
835 f.write("egg\n")
836 fd = os.open(support.TESTFN, os.O_RDONLY)
837 def opener(path, flags):
838 return fd
839 with self.open("non-existent", "r", opener=opener) as f:
840 self.assertEqual(f.read(), "egg\n")
841
Barry Warsaw480e2852016-06-08 17:47:26 -0400842 def test_bad_opener_negative_1(self):
843 # Issue #27066.
844 def badopener(fname, flags):
845 return -1
846 with self.assertRaises(ValueError) as cm:
847 open('non-existent', 'r', opener=badopener)
848 self.assertEqual(str(cm.exception), 'opener returned -1')
849
850 def test_bad_opener_other_negative(self):
851 # Issue #27066.
852 def badopener(fname, flags):
853 return -2
854 with self.assertRaises(ValueError) as cm:
855 open('non-existent', 'r', opener=badopener)
856 self.assertEqual(str(cm.exception), 'opener returned -2')
857
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200858 def test_fileio_closefd(self):
859 # Issue #4841
860 with self.open(__file__, 'rb') as f1, \
861 self.open(__file__, 'rb') as f2:
862 fileio = self.FileIO(f1.fileno(), closefd=False)
863 # .__init__() must not close f1
864 fileio.__init__(f2.fileno(), closefd=False)
865 f1.readline()
866 # .close() must not close f2
867 fileio.close()
868 f2.readline()
869
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300870 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200871 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300872 with self.assertRaises(ValueError):
873 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300874
875 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200876 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300877 with self.assertRaises(ValueError):
878 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300879
Martin Panter6bb91f32016-05-28 00:41:57 +0000880 def test_buffered_readinto_mixin(self):
881 # Test the implementation provided by BufferedIOBase
882 class Stream(self.BufferedIOBase):
883 def read(self, size):
884 return b"12345"
885 read1 = read
886 stream = Stream()
887 for method in ("readinto", "readinto1"):
888 with self.subTest(method):
889 buffer = byteslike(5)
890 self.assertEqual(getattr(stream, method)(buffer), 5)
891 self.assertEqual(bytes(buffer), b"12345")
892
Ethan Furmand62548a2016-06-04 14:38:43 -0700893 def test_fspath_support(self):
894 class PathLike:
895 def __init__(self, path):
896 self.path = path
897
898 def __fspath__(self):
899 return self.path
900
901 def check_path_succeeds(path):
902 with self.open(path, "w") as f:
903 f.write("egg\n")
904
905 with self.open(path, "r") as f:
906 self.assertEqual(f.read(), "egg\n")
907
908 check_path_succeeds(PathLike(support.TESTFN))
909 check_path_succeeds(PathLike(support.TESTFN.encode('utf-8')))
910
911 bad_path = PathLike(TypeError)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700912 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700913 self.open(bad_path, 'w')
914
915 # ensure that refcounting is correct with some error conditions
916 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
917 self.open(PathLike(support.TESTFN), 'rwxa')
918
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530919 def test_RawIOBase_readall(self):
920 # Exercise the default unlimited RawIOBase.read() and readall()
921 # implementations.
922 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
923 self.assertEqual(rawio.read(), b"abcdefg")
924 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
925 self.assertEqual(rawio.readall(), b"abcdefg")
926
927 def test_BufferedIOBase_readinto(self):
928 # Exercise the default BufferedIOBase.readinto() and readinto1()
929 # implementations (which call read() or read1() internally).
930 class Reader(self.BufferedIOBase):
931 def __init__(self, avail):
932 self.avail = avail
933 def read(self, size):
934 result = self.avail[:size]
935 self.avail = self.avail[size:]
936 return result
937 def read1(self, size):
938 """Returns no more than 5 bytes at once"""
939 return self.read(min(size, 5))
940 tests = (
941 # (test method, total data available, read buffer size, expected
942 # read size)
943 ("readinto", 10, 5, 5),
944 ("readinto", 10, 6, 6), # More than read1() can return
945 ("readinto", 5, 6, 5), # Buffer larger than total available
946 ("readinto", 6, 7, 6),
947 ("readinto", 10, 0, 0), # Empty buffer
948 ("readinto1", 10, 5, 5), # Result limited to single read1() call
949 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return
950 ("readinto1", 5, 6, 5), # Buffer larger than total available
951 ("readinto1", 6, 7, 5),
952 ("readinto1", 10, 0, 0), # Empty buffer
953 )
954 UNUSED_BYTE = 0x81
955 for test in tests:
956 with self.subTest(test):
957 method, avail, request, result = test
958 reader = Reader(bytes(range(avail)))
959 buffer = bytearray((UNUSED_BYTE,) * request)
960 method = getattr(reader, method)
961 self.assertEqual(method(buffer), result)
962 self.assertEqual(len(buffer), request)
963 self.assertSequenceEqual(buffer[:result], range(result))
964 unused = (UNUSED_BYTE,) * (request - result)
965 self.assertSequenceEqual(buffer[result:], unused)
966 self.assertEqual(len(reader.avail), avail - result)
967
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200968
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000969class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200970
971 def test_IOBase_finalize(self):
972 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
973 # class which inherits IOBase and an object of this class are caught
974 # in a reference cycle and close() is already in the method cache.
975 class MyIO(self.IOBase):
976 def close(self):
977 pass
978
979 # create an instance to populate the method cache
980 MyIO()
981 obj = MyIO()
982 obj.obj = obj
983 wr = weakref.ref(obj)
984 del MyIO
985 del obj
986 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300987 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000988
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000989class PyIOTest(IOTest):
990 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000991
Guido van Rossuma9e20242007-03-08 00:43:48 +0000992
Gregory P. Smith1bef9072015-04-14 13:24:34 -0700993@support.cpython_only
994class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -0700995
Gregory P. Smith054b0652015-04-14 12:58:05 -0700996 def test_RawIOBase_io_in_pyio_match(self):
997 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +0200998 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
999 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -07001000 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1001
1002 def test_RawIOBase_pyio_in_io_match(self):
1003 """Test that c RawIOBase class has all pyio RawIOBase methods"""
1004 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1005 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1006
1007
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001008class CommonBufferedTests:
1009 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1010
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001011 def test_detach(self):
1012 raw = self.MockRawIO()
1013 buf = self.tp(raw)
1014 self.assertIs(buf.detach(), raw)
1015 self.assertRaises(ValueError, buf.detach)
1016
Benjamin Peterson10e76b62014-12-21 20:51:50 -06001017 repr(buf) # Should still work
1018
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001019 def test_fileno(self):
1020 rawio = self.MockRawIO()
1021 bufio = self.tp(rawio)
1022
Ezio Melottib3aedd42010-11-20 19:04:17 +00001023 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001024
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001025 def test_invalid_args(self):
1026 rawio = self.MockRawIO()
1027 bufio = self.tp(rawio)
1028 # Invalid whence
1029 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +02001030 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001031
1032 def test_override_destructor(self):
1033 tp = self.tp
1034 record = []
1035 class MyBufferedIO(tp):
1036 def __del__(self):
1037 record.append(1)
1038 try:
1039 f = super().__del__
1040 except AttributeError:
1041 pass
1042 else:
1043 f()
1044 def close(self):
1045 record.append(2)
1046 super().close()
1047 def flush(self):
1048 record.append(3)
1049 super().flush()
1050 rawio = self.MockRawIO()
1051 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001052 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001053 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +00001054 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001055
1056 def test_context_manager(self):
1057 # Test usability as a context manager
1058 rawio = self.MockRawIO()
1059 bufio = self.tp(rawio)
1060 def _with():
1061 with bufio:
1062 pass
1063 _with()
1064 # bufio should now be closed, and using it a second time should raise
1065 # a ValueError.
1066 self.assertRaises(ValueError, _with)
1067
1068 def test_error_through_destructor(self):
1069 # Test that the exception state is not modified by a destructor,
1070 # even if close() fails.
1071 rawio = self.CloseFailureIO()
1072 def f():
1073 self.tp(rawio).xyzzy
1074 with support.captured_output("stderr") as s:
1075 self.assertRaises(AttributeError, f)
1076 s = s.getvalue().strip()
1077 if s:
1078 # The destructor *may* have printed an unraisable error, check it
1079 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001080 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001081 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +00001082
Antoine Pitrou716c4442009-05-23 19:04:03 +00001083 def test_repr(self):
1084 raw = self.MockRawIO()
1085 b = self.tp(raw)
Serhiy Storchaka521e5862014-07-22 15:00:37 +03001086 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001087 self.assertEqual(repr(b), "<%s>" % clsname)
1088 raw.name = "dummy"
1089 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
1090 raw.name = b"dummy"
1091 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
1092
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02001093 def test_recursive_repr(self):
1094 # Issue #25455
1095 raw = self.MockRawIO()
1096 b = self.tp(raw)
1097 with support.swap_attr(raw, 'name', b):
1098 try:
1099 repr(b) # Should not crash
1100 except RuntimeError:
1101 pass
1102
Antoine Pitrou6be88762010-05-03 16:48:20 +00001103 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001104 # Test that buffered file is closed despite failed flush
1105 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001106 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001107 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001108 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001109 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001110 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001111 raw.flush = bad_flush
1112 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001113 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001114 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001115 self.assertTrue(raw.closed)
1116 self.assertTrue(closed) # flush() called
1117 self.assertFalse(closed[0]) # flush() called before file closed
1118 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001119 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001120
1121 def test_close_error_on_close(self):
1122 raw = self.MockRawIO()
1123 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001124 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001125 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001126 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001127 raw.close = bad_close
1128 b = self.tp(raw)
1129 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001130 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001131 b.close()
1132 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001133 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001134 self.assertEqual(err.exception.__context__.args, ('flush',))
1135 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001136
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001137 def test_nonnormalized_close_error_on_close(self):
1138 # Issue #21677
1139 raw = self.MockRawIO()
1140 def bad_flush():
1141 raise non_existing_flush
1142 def bad_close():
1143 raise non_existing_close
1144 raw.close = bad_close
1145 b = self.tp(raw)
1146 b.flush = bad_flush
1147 with self.assertRaises(NameError) as err: # exception not swallowed
1148 b.close()
1149 self.assertIn('non_existing_close', str(err.exception))
1150 self.assertIsInstance(err.exception.__context__, NameError)
1151 self.assertIn('non_existing_flush', str(err.exception.__context__))
1152 self.assertFalse(b.closed)
1153
Antoine Pitrou6be88762010-05-03 16:48:20 +00001154 def test_multi_close(self):
1155 raw = self.MockRawIO()
1156 b = self.tp(raw)
1157 b.close()
1158 b.close()
1159 b.close()
1160 self.assertRaises(ValueError, b.flush)
1161
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001162 def test_unseekable(self):
1163 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1164 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1165 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1166
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001167 def test_readonly_attributes(self):
1168 raw = self.MockRawIO()
1169 buf = self.tp(raw)
1170 x = self.MockRawIO()
1171 with self.assertRaises(AttributeError):
1172 buf.raw = x
1173
Guido van Rossum78892e42007-04-06 17:31:18 +00001174
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001175class SizeofTest:
1176
1177 @support.cpython_only
1178 def test_sizeof(self):
1179 bufsize1 = 4096
1180 bufsize2 = 8192
1181 rawio = self.MockRawIO()
1182 bufio = self.tp(rawio, buffer_size=bufsize1)
1183 size = sys.getsizeof(bufio) - bufsize1
1184 rawio = self.MockRawIO()
1185 bufio = self.tp(rawio, buffer_size=bufsize2)
1186 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1187
Jesus Ceadc469452012-10-04 12:37:56 +02001188 @support.cpython_only
1189 def test_buffer_freeing(self) :
1190 bufsize = 4096
1191 rawio = self.MockRawIO()
1192 bufio = self.tp(rawio, buffer_size=bufsize)
1193 size = sys.getsizeof(bufio) - bufsize
1194 bufio.close()
1195 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001196
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001197class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1198 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001200 def test_constructor(self):
1201 rawio = self.MockRawIO([b"abc"])
1202 bufio = self.tp(rawio)
1203 bufio.__init__(rawio)
1204 bufio.__init__(rawio, buffer_size=1024)
1205 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001206 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001207 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1208 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1209 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1210 rawio = self.MockRawIO([b"abc"])
1211 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001212 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001213
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001214 def test_uninitialized(self):
1215 bufio = self.tp.__new__(self.tp)
1216 del bufio
1217 bufio = self.tp.__new__(self.tp)
1218 self.assertRaisesRegex((ValueError, AttributeError),
1219 'uninitialized|has no attribute',
1220 bufio.read, 0)
1221 bufio.__init__(self.MockRawIO())
1222 self.assertEqual(bufio.read(0), b'')
1223
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001224 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001225 for arg in (None, 7):
1226 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1227 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001228 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001229 # Invalid args
1230 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001231
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001232 def test_read1(self):
1233 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1234 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001235 self.assertEqual(b"a", bufio.read(1))
1236 self.assertEqual(b"b", bufio.read1(1))
1237 self.assertEqual(rawio._reads, 1)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001238 self.assertEqual(b"", bufio.read1(0))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001239 self.assertEqual(b"c", bufio.read1(100))
1240 self.assertEqual(rawio._reads, 1)
1241 self.assertEqual(b"d", bufio.read1(100))
1242 self.assertEqual(rawio._reads, 2)
1243 self.assertEqual(b"efg", bufio.read1(100))
1244 self.assertEqual(rawio._reads, 3)
1245 self.assertEqual(b"", bufio.read1(100))
1246 self.assertEqual(rawio._reads, 4)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001247
1248 def test_read1_arbitrary(self):
1249 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1250 bufio = self.tp(rawio)
1251 self.assertEqual(b"a", bufio.read(1))
1252 self.assertEqual(b"bc", bufio.read1())
1253 self.assertEqual(b"d", bufio.read1())
1254 self.assertEqual(b"efg", bufio.read1(-1))
1255 self.assertEqual(rawio._reads, 3)
1256 self.assertEqual(b"", bufio.read1())
1257 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001258
1259 def test_readinto(self):
1260 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1261 bufio = self.tp(rawio)
1262 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001263 self.assertEqual(bufio.readinto(b), 2)
1264 self.assertEqual(b, b"ab")
1265 self.assertEqual(bufio.readinto(b), 2)
1266 self.assertEqual(b, b"cd")
1267 self.assertEqual(bufio.readinto(b), 2)
1268 self.assertEqual(b, b"ef")
1269 self.assertEqual(bufio.readinto(b), 1)
1270 self.assertEqual(b, b"gf")
1271 self.assertEqual(bufio.readinto(b), 0)
1272 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001273 rawio = self.MockRawIO((b"abc", None))
1274 bufio = self.tp(rawio)
1275 self.assertEqual(bufio.readinto(b), 2)
1276 self.assertEqual(b, b"ab")
1277 self.assertEqual(bufio.readinto(b), 1)
1278 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001279
Benjamin Petersona96fea02014-06-22 14:17:44 -07001280 def test_readinto1(self):
1281 buffer_size = 10
1282 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1283 bufio = self.tp(rawio, buffer_size=buffer_size)
1284 b = bytearray(2)
1285 self.assertEqual(bufio.peek(3), b'abc')
1286 self.assertEqual(rawio._reads, 1)
1287 self.assertEqual(bufio.readinto1(b), 2)
1288 self.assertEqual(b, b"ab")
1289 self.assertEqual(rawio._reads, 1)
1290 self.assertEqual(bufio.readinto1(b), 1)
1291 self.assertEqual(b[:1], b"c")
1292 self.assertEqual(rawio._reads, 1)
1293 self.assertEqual(bufio.readinto1(b), 2)
1294 self.assertEqual(b, b"de")
1295 self.assertEqual(rawio._reads, 2)
1296 b = bytearray(2*buffer_size)
1297 self.assertEqual(bufio.peek(3), b'fgh')
1298 self.assertEqual(rawio._reads, 3)
1299 self.assertEqual(bufio.readinto1(b), 6)
1300 self.assertEqual(b[:6], b"fghjkl")
1301 self.assertEqual(rawio._reads, 4)
1302
1303 def test_readinto_array(self):
1304 buffer_size = 60
1305 data = b"a" * 26
1306 rawio = self.MockRawIO((data,))
1307 bufio = self.tp(rawio, buffer_size=buffer_size)
1308
1309 # Create an array with element size > 1 byte
1310 b = array.array('i', b'x' * 32)
1311 assert len(b) != 16
1312
1313 # Read into it. We should get as many *bytes* as we can fit into b
1314 # (which is more than the number of elements)
1315 n = bufio.readinto(b)
1316 self.assertGreater(n, len(b))
1317
1318 # Check that old contents of b are preserved
1319 bm = memoryview(b).cast('B')
1320 self.assertLess(n, len(bm))
1321 self.assertEqual(bm[:n], data[:n])
1322 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1323
1324 def test_readinto1_array(self):
1325 buffer_size = 60
1326 data = b"a" * 26
1327 rawio = self.MockRawIO((data,))
1328 bufio = self.tp(rawio, buffer_size=buffer_size)
1329
1330 # Create an array with element size > 1 byte
1331 b = array.array('i', b'x' * 32)
1332 assert len(b) != 16
1333
1334 # Read into it. We should get as many *bytes* as we can fit into b
1335 # (which is more than the number of elements)
1336 n = bufio.readinto1(b)
1337 self.assertGreater(n, len(b))
1338
1339 # Check that old contents of b are preserved
1340 bm = memoryview(b).cast('B')
1341 self.assertLess(n, len(bm))
1342 self.assertEqual(bm[:n], data[:n])
1343 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1344
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001345 def test_readlines(self):
1346 def bufio():
1347 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1348 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001349 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1350 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1351 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001352
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001353 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001354 data = b"abcdefghi"
1355 dlen = len(data)
1356
1357 tests = [
1358 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1359 [ 100, [ 3, 3, 3], [ dlen ] ],
1360 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1361 ]
1362
1363 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001364 rawio = self.MockFileIO(data)
1365 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001366 pos = 0
1367 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001368 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001369 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001370 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001371 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001372
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001373 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001374 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001375 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1376 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001377 self.assertEqual(b"abcd", bufio.read(6))
1378 self.assertEqual(b"e", bufio.read(1))
1379 self.assertEqual(b"fg", bufio.read())
1380 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001381 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001382 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001383
Victor Stinnera80987f2011-05-25 22:47:16 +02001384 rawio = self.MockRawIO((b"a", None, None))
1385 self.assertEqual(b"a", rawio.readall())
1386 self.assertIsNone(rawio.readall())
1387
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001388 def test_read_past_eof(self):
1389 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1390 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001391
Ezio Melottib3aedd42010-11-20 19:04:17 +00001392 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001393
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001394 def test_read_all(self):
1395 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1396 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001397
Ezio Melottib3aedd42010-11-20 19:04:17 +00001398 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001399
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001400 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001401 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001402 try:
1403 # Write out many bytes with exactly the same number of 0's,
1404 # 1's... 255's. This will help us check that concurrent reading
1405 # doesn't duplicate or forget contents.
1406 N = 1000
1407 l = list(range(256)) * N
1408 random.shuffle(l)
1409 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001410 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001411 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001412 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001413 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001414 errors = []
1415 results = []
1416 def f():
1417 try:
1418 # Intra-buffer read then buffer-flushing read
1419 for n in cycle([1, 19]):
1420 s = bufio.read(n)
1421 if not s:
1422 break
1423 # list.append() is atomic
1424 results.append(s)
1425 except Exception as e:
1426 errors.append(e)
1427 raise
1428 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001429 with support.start_threads(threads):
1430 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001431 self.assertFalse(errors,
1432 "the following exceptions were caught: %r" % errors)
1433 s = b''.join(results)
1434 for i in range(256):
1435 c = bytes(bytearray([i]))
1436 self.assertEqual(s.count(c), N)
1437 finally:
1438 support.unlink(support.TESTFN)
1439
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001440 def test_unseekable(self):
1441 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1442 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1443 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1444 bufio.read(1)
1445 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1446 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1447
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001448 def test_misbehaved_io(self):
1449 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1450 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001451 self.assertRaises(OSError, bufio.seek, 0)
1452 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001453
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001454 def test_no_extraneous_read(self):
1455 # Issue #9550; when the raw IO object has satisfied the read request,
1456 # we should not issue any additional reads, otherwise it may block
1457 # (e.g. socket).
1458 bufsize = 16
1459 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1460 rawio = self.MockRawIO([b"x" * n])
1461 bufio = self.tp(rawio, bufsize)
1462 self.assertEqual(bufio.read(n), b"x" * n)
1463 # Simple case: one raw read is enough to satisfy the request.
1464 self.assertEqual(rawio._extraneous_reads, 0,
1465 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1466 # A more complex case where two raw reads are needed to satisfy
1467 # the request.
1468 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1469 bufio = self.tp(rawio, bufsize)
1470 self.assertEqual(bufio.read(n), b"x" * n)
1471 self.assertEqual(rawio._extraneous_reads, 0,
1472 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1473
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001474 def test_read_on_closed(self):
1475 # Issue #23796
1476 b = io.BufferedReader(io.BytesIO(b"12"))
1477 b.read(1)
1478 b.close()
1479 self.assertRaises(ValueError, b.peek)
1480 self.assertRaises(ValueError, b.read1, 1)
1481
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001482
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001483class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001484 tp = io.BufferedReader
1485
1486 def test_constructor(self):
1487 BufferedReaderTest.test_constructor(self)
1488 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001489 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001490 if sys.maxsize > 0x7FFFFFFF:
1491 rawio = self.MockRawIO()
1492 bufio = self.tp(rawio)
1493 self.assertRaises((OverflowError, MemoryError, ValueError),
1494 bufio.__init__, rawio, sys.maxsize)
1495
1496 def test_initialization(self):
1497 rawio = self.MockRawIO([b"abc"])
1498 bufio = self.tp(rawio)
1499 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1500 self.assertRaises(ValueError, bufio.read)
1501 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1502 self.assertRaises(ValueError, bufio.read)
1503 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1504 self.assertRaises(ValueError, bufio.read)
1505
1506 def test_misbehaved_io_read(self):
1507 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1508 bufio = self.tp(rawio)
1509 # _pyio.BufferedReader seems to implement reading different, so that
1510 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001511 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001512
1513 def test_garbage_collection(self):
1514 # C BufferedReader objects are collected.
1515 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001516 with support.check_warnings(('', ResourceWarning)):
1517 rawio = self.FileIO(support.TESTFN, "w+b")
1518 f = self.tp(rawio)
1519 f.f = f
1520 wr = weakref.ref(f)
1521 del f
1522 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001523 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001524
R David Murray67bfe802013-02-23 21:51:05 -05001525 def test_args_error(self):
1526 # Issue #17275
1527 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1528 self.tp(io.BytesIO(), 1024, 1024, 1024)
1529
1530
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001531class PyBufferedReaderTest(BufferedReaderTest):
1532 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001533
Guido van Rossuma9e20242007-03-08 00:43:48 +00001534
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001535class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1536 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001537
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001538 def test_constructor(self):
1539 rawio = self.MockRawIO()
1540 bufio = self.tp(rawio)
1541 bufio.__init__(rawio)
1542 bufio.__init__(rawio, buffer_size=1024)
1543 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001544 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001545 bufio.flush()
1546 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1547 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1548 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1549 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001550 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001551 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001552 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001553
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001554 def test_uninitialized(self):
1555 bufio = self.tp.__new__(self.tp)
1556 del bufio
1557 bufio = self.tp.__new__(self.tp)
1558 self.assertRaisesRegex((ValueError, AttributeError),
1559 'uninitialized|has no attribute',
1560 bufio.write, b'')
1561 bufio.__init__(self.MockRawIO())
1562 self.assertEqual(bufio.write(b''), 0)
1563
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001564 def test_detach_flush(self):
1565 raw = self.MockRawIO()
1566 buf = self.tp(raw)
1567 buf.write(b"howdy!")
1568 self.assertFalse(raw._write_stack)
1569 buf.detach()
1570 self.assertEqual(raw._write_stack, [b"howdy!"])
1571
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001572 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001573 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001574 writer = self.MockRawIO()
1575 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001576 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001577 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001578 buffer = bytearray(b"def")
1579 bufio.write(buffer)
1580 buffer[:] = b"***" # Overwrite our copy of the data
1581 bufio.flush()
1582 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001583
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001584 def test_write_overflow(self):
1585 writer = self.MockRawIO()
1586 bufio = self.tp(writer, 8)
1587 contents = b"abcdefghijklmnop"
1588 for n in range(0, len(contents), 3):
1589 bufio.write(contents[n:n+3])
1590 flushed = b"".join(writer._write_stack)
1591 # At least (total - 8) bytes were implicitly flushed, perhaps more
1592 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001593 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001594
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001595 def check_writes(self, intermediate_func):
1596 # Lots of writes, test the flushed output is as expected.
1597 contents = bytes(range(256)) * 1000
1598 n = 0
1599 writer = self.MockRawIO()
1600 bufio = self.tp(writer, 13)
1601 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1602 def gen_sizes():
1603 for size in count(1):
1604 for i in range(15):
1605 yield size
1606 sizes = gen_sizes()
1607 while n < len(contents):
1608 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001609 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001610 intermediate_func(bufio)
1611 n += size
1612 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001613 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001614
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001615 def test_writes(self):
1616 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001617
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001618 def test_writes_and_flushes(self):
1619 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001620
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001621 def test_writes_and_seeks(self):
1622 def _seekabs(bufio):
1623 pos = bufio.tell()
1624 bufio.seek(pos + 1, 0)
1625 bufio.seek(pos - 1, 0)
1626 bufio.seek(pos, 0)
1627 self.check_writes(_seekabs)
1628 def _seekrel(bufio):
1629 pos = bufio.seek(0, 1)
1630 bufio.seek(+1, 1)
1631 bufio.seek(-1, 1)
1632 bufio.seek(pos, 0)
1633 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001634
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001635 def test_writes_and_truncates(self):
1636 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001637
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001638 def test_write_non_blocking(self):
1639 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001640 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001641
Ezio Melottib3aedd42010-11-20 19:04:17 +00001642 self.assertEqual(bufio.write(b"abcd"), 4)
1643 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001644 # 1 byte will be written, the rest will be buffered
1645 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001646 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001647
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001648 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1649 raw.block_on(b"0")
1650 try:
1651 bufio.write(b"opqrwxyz0123456789")
1652 except self.BlockingIOError as e:
1653 written = e.characters_written
1654 else:
1655 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001656 self.assertEqual(written, 16)
1657 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001658 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001659
Ezio Melottib3aedd42010-11-20 19:04:17 +00001660 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001661 s = raw.pop_written()
1662 # Previously buffered bytes were flushed
1663 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001664
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001665 def test_write_and_rewind(self):
1666 raw = io.BytesIO()
1667 bufio = self.tp(raw, 4)
1668 self.assertEqual(bufio.write(b"abcdef"), 6)
1669 self.assertEqual(bufio.tell(), 6)
1670 bufio.seek(0, 0)
1671 self.assertEqual(bufio.write(b"XY"), 2)
1672 bufio.seek(6, 0)
1673 self.assertEqual(raw.getvalue(), b"XYcdef")
1674 self.assertEqual(bufio.write(b"123456"), 6)
1675 bufio.flush()
1676 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001677
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001678 def test_flush(self):
1679 writer = self.MockRawIO()
1680 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001681 bufio.write(b"abc")
1682 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001683 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001684
Antoine Pitrou131a4892012-10-16 22:57:11 +02001685 def test_writelines(self):
1686 l = [b'ab', b'cd', b'ef']
1687 writer = self.MockRawIO()
1688 bufio = self.tp(writer, 8)
1689 bufio.writelines(l)
1690 bufio.flush()
1691 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1692
1693 def test_writelines_userlist(self):
1694 l = UserList([b'ab', b'cd', b'ef'])
1695 writer = self.MockRawIO()
1696 bufio = self.tp(writer, 8)
1697 bufio.writelines(l)
1698 bufio.flush()
1699 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1700
1701 def test_writelines_error(self):
1702 writer = self.MockRawIO()
1703 bufio = self.tp(writer, 8)
1704 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1705 self.assertRaises(TypeError, bufio.writelines, None)
1706 self.assertRaises(TypeError, bufio.writelines, 'abc')
1707
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001708 def test_destructor(self):
1709 writer = self.MockRawIO()
1710 bufio = self.tp(writer, 8)
1711 bufio.write(b"abc")
1712 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001713 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001714 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001715
1716 def test_truncate(self):
1717 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001718 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001719 bufio = self.tp(raw, 8)
1720 bufio.write(b"abcdef")
1721 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001722 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001723 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001724 self.assertEqual(f.read(), b"abc")
1725
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001726 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001727 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001728 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001729 # Write out many bytes from many threads and test they were
1730 # all flushed.
1731 N = 1000
1732 contents = bytes(range(256)) * N
1733 sizes = cycle([1, 19])
1734 n = 0
1735 queue = deque()
1736 while n < len(contents):
1737 size = next(sizes)
1738 queue.append(contents[n:n+size])
1739 n += size
1740 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001741 # We use a real file object because it allows us to
1742 # exercise situations where the GIL is released before
1743 # writing the buffer to the raw streams. This is in addition
1744 # to concurrency issues due to switching threads in the middle
1745 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001746 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001747 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001748 errors = []
1749 def f():
1750 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001751 while True:
1752 try:
1753 s = queue.popleft()
1754 except IndexError:
1755 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001756 bufio.write(s)
1757 except Exception as e:
1758 errors.append(e)
1759 raise
1760 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001761 with support.start_threads(threads):
1762 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001763 self.assertFalse(errors,
1764 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001765 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001766 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001767 s = f.read()
1768 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001769 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001770 finally:
1771 support.unlink(support.TESTFN)
1772
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001773 def test_misbehaved_io(self):
1774 rawio = self.MisbehavedRawIO()
1775 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001776 self.assertRaises(OSError, bufio.seek, 0)
1777 self.assertRaises(OSError, bufio.tell)
1778 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001779
Florent Xicluna109d5732012-07-07 17:03:22 +02001780 def test_max_buffer_size_removal(self):
1781 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001782 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001783
Benjamin Peterson68623612012-12-20 11:53:11 -06001784 def test_write_error_on_close(self):
1785 raw = self.MockRawIO()
1786 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001787 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001788 raw.write = bad_write
1789 b = self.tp(raw)
1790 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001791 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001792 self.assertTrue(b.closed)
1793
benfogle9703f092017-11-10 16:03:40 -05001794 def test_slow_close_from_thread(self):
1795 # Issue #31976
1796 rawio = self.SlowFlushRawIO()
1797 bufio = self.tp(rawio, 8)
1798 t = threading.Thread(target=bufio.close)
1799 t.start()
1800 rawio.in_flush.wait()
1801 self.assertRaises(ValueError, bufio.write, b'spam')
1802 self.assertTrue(bufio.closed)
1803 t.join()
1804
1805
Benjamin Peterson59406a92009-03-26 17:10:29 +00001806
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001807class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001808 tp = io.BufferedWriter
1809
1810 def test_constructor(self):
1811 BufferedWriterTest.test_constructor(self)
1812 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001813 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001814 if sys.maxsize > 0x7FFFFFFF:
1815 rawio = self.MockRawIO()
1816 bufio = self.tp(rawio)
1817 self.assertRaises((OverflowError, MemoryError, ValueError),
1818 bufio.__init__, rawio, sys.maxsize)
1819
1820 def test_initialization(self):
1821 rawio = self.MockRawIO()
1822 bufio = self.tp(rawio)
1823 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1824 self.assertRaises(ValueError, bufio.write, b"def")
1825 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1826 self.assertRaises(ValueError, bufio.write, b"def")
1827 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1828 self.assertRaises(ValueError, bufio.write, b"def")
1829
1830 def test_garbage_collection(self):
1831 # C BufferedWriter objects are collected, and collecting them flushes
1832 # all data to disk.
1833 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001834 with support.check_warnings(('', ResourceWarning)):
1835 rawio = self.FileIO(support.TESTFN, "w+b")
1836 f = self.tp(rawio)
1837 f.write(b"123xxx")
1838 f.x = f
1839 wr = weakref.ref(f)
1840 del f
1841 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001842 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001843 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001844 self.assertEqual(f.read(), b"123xxx")
1845
R David Murray67bfe802013-02-23 21:51:05 -05001846 def test_args_error(self):
1847 # Issue #17275
1848 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1849 self.tp(io.BytesIO(), 1024, 1024, 1024)
1850
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001851
1852class PyBufferedWriterTest(BufferedWriterTest):
1853 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001854
Guido van Rossum01a27522007-03-07 01:00:12 +00001855class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001856
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001857 def test_constructor(self):
1858 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001859 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001860
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001861 def test_uninitialized(self):
1862 pair = self.tp.__new__(self.tp)
1863 del pair
1864 pair = self.tp.__new__(self.tp)
1865 self.assertRaisesRegex((ValueError, AttributeError),
1866 'uninitialized|has no attribute',
1867 pair.read, 0)
1868 self.assertRaisesRegex((ValueError, AttributeError),
1869 'uninitialized|has no attribute',
1870 pair.write, b'')
1871 pair.__init__(self.MockRawIO(), self.MockRawIO())
1872 self.assertEqual(pair.read(0), b'')
1873 self.assertEqual(pair.write(b''), 0)
1874
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001875 def test_detach(self):
1876 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1877 self.assertRaises(self.UnsupportedOperation, pair.detach)
1878
Florent Xicluna109d5732012-07-07 17:03:22 +02001879 def test_constructor_max_buffer_size_removal(self):
1880 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001881 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001882
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001883 def test_constructor_with_not_readable(self):
1884 class NotReadable(MockRawIO):
1885 def readable(self):
1886 return False
1887
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001888 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001889
1890 def test_constructor_with_not_writeable(self):
1891 class NotWriteable(MockRawIO):
1892 def writable(self):
1893 return False
1894
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001895 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001896
1897 def test_read(self):
1898 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1899
1900 self.assertEqual(pair.read(3), b"abc")
1901 self.assertEqual(pair.read(1), b"d")
1902 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001903 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1904 self.assertEqual(pair.read(None), b"abc")
1905
1906 def test_readlines(self):
1907 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1908 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1909 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1910 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001911
1912 def test_read1(self):
1913 # .read1() is delegated to the underlying reader object, so this test
1914 # can be shallow.
1915 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1916
1917 self.assertEqual(pair.read1(3), b"abc")
Martin Panterccb2c0e2016-10-20 23:48:14 +00001918 self.assertEqual(pair.read1(), b"def")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001919
1920 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00001921 for method in ("readinto", "readinto1"):
1922 with self.subTest(method):
1923 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001924
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03001925 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00001926 self.assertEqual(getattr(pair, method)(data), 5)
1927 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001928
1929 def test_write(self):
1930 w = self.MockRawIO()
1931 pair = self.tp(self.MockRawIO(), w)
1932
1933 pair.write(b"abc")
1934 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00001935 buffer = bytearray(b"def")
1936 pair.write(buffer)
1937 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001938 pair.flush()
1939 self.assertEqual(w._write_stack, [b"abc", b"def"])
1940
1941 def test_peek(self):
1942 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1943
1944 self.assertTrue(pair.peek(3).startswith(b"abc"))
1945 self.assertEqual(pair.read(3), b"abc")
1946
1947 def test_readable(self):
1948 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1949 self.assertTrue(pair.readable())
1950
1951 def test_writeable(self):
1952 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1953 self.assertTrue(pair.writable())
1954
1955 def test_seekable(self):
1956 # BufferedRWPairs are never seekable, even if their readers and writers
1957 # are.
1958 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1959 self.assertFalse(pair.seekable())
1960
1961 # .flush() is delegated to the underlying writer object and has been
1962 # tested in the test_write method.
1963
1964 def test_close_and_closed(self):
1965 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1966 self.assertFalse(pair.closed)
1967 pair.close()
1968 self.assertTrue(pair.closed)
1969
Serhiy Storchaka7665be62015-03-24 23:21:57 +02001970 def test_reader_close_error_on_close(self):
1971 def reader_close():
1972 reader_non_existing
1973 reader = self.MockRawIO()
1974 reader.close = reader_close
1975 writer = self.MockRawIO()
1976 pair = self.tp(reader, writer)
1977 with self.assertRaises(NameError) as err:
1978 pair.close()
1979 self.assertIn('reader_non_existing', str(err.exception))
1980 self.assertTrue(pair.closed)
1981 self.assertFalse(reader.closed)
1982 self.assertTrue(writer.closed)
1983
1984 def test_writer_close_error_on_close(self):
1985 def writer_close():
1986 writer_non_existing
1987 reader = self.MockRawIO()
1988 writer = self.MockRawIO()
1989 writer.close = writer_close
1990 pair = self.tp(reader, writer)
1991 with self.assertRaises(NameError) as err:
1992 pair.close()
1993 self.assertIn('writer_non_existing', str(err.exception))
1994 self.assertFalse(pair.closed)
1995 self.assertTrue(reader.closed)
1996 self.assertFalse(writer.closed)
1997
1998 def test_reader_writer_close_error_on_close(self):
1999 def reader_close():
2000 reader_non_existing
2001 def writer_close():
2002 writer_non_existing
2003 reader = self.MockRawIO()
2004 reader.close = reader_close
2005 writer = self.MockRawIO()
2006 writer.close = writer_close
2007 pair = self.tp(reader, writer)
2008 with self.assertRaises(NameError) as err:
2009 pair.close()
2010 self.assertIn('reader_non_existing', str(err.exception))
2011 self.assertIsInstance(err.exception.__context__, NameError)
2012 self.assertIn('writer_non_existing', str(err.exception.__context__))
2013 self.assertFalse(pair.closed)
2014 self.assertFalse(reader.closed)
2015 self.assertFalse(writer.closed)
2016
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002017 def test_isatty(self):
2018 class SelectableIsAtty(MockRawIO):
2019 def __init__(self, isatty):
2020 MockRawIO.__init__(self)
2021 self._isatty = isatty
2022
2023 def isatty(self):
2024 return self._isatty
2025
2026 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2027 self.assertFalse(pair.isatty())
2028
2029 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2030 self.assertTrue(pair.isatty())
2031
2032 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2033 self.assertTrue(pair.isatty())
2034
2035 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2036 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00002037
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04002038 def test_weakref_clearing(self):
2039 brw = self.tp(self.MockRawIO(), self.MockRawIO())
2040 ref = weakref.ref(brw)
2041 brw = None
2042 ref = None # Shouldn't segfault.
2043
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002044class CBufferedRWPairTest(BufferedRWPairTest):
2045 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002046
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002047class PyBufferedRWPairTest(BufferedRWPairTest):
2048 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002049
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002050
2051class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2052 read_mode = "rb+"
2053 write_mode = "wb+"
2054
2055 def test_constructor(self):
2056 BufferedReaderTest.test_constructor(self)
2057 BufferedWriterTest.test_constructor(self)
2058
Serhiy Storchaka61e24932014-02-12 10:52:35 +02002059 def test_uninitialized(self):
2060 BufferedReaderTest.test_uninitialized(self)
2061 BufferedWriterTest.test_uninitialized(self)
2062
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002063 def test_read_and_write(self):
2064 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00002065 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00002066
2067 self.assertEqual(b"as", rw.read(2))
2068 rw.write(b"ddd")
2069 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002070 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002071 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002072 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00002073
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002074 def test_seek_and_tell(self):
2075 raw = self.BytesIO(b"asdfghjkl")
2076 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00002077
Ezio Melottib3aedd42010-11-20 19:04:17 +00002078 self.assertEqual(b"as", rw.read(2))
2079 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002080 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002081 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00002082
Antoine Pitroue05565e2011-08-20 14:39:23 +02002083 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00002084 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02002085 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002086 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002087 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002088 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002089 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002090 self.assertEqual(7, rw.tell())
2091 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02002092 rw.flush()
2093 self.assertEqual(b"asdf123fl", raw.getvalue())
2094
Christian Heimes8e42a0a2007-11-08 18:04:45 +00002095 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002096
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002097 def check_flush_and_read(self, read_func):
2098 raw = self.BytesIO(b"abcdefghi")
2099 bufio = self.tp(raw)
2100
Ezio Melottib3aedd42010-11-20 19:04:17 +00002101 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002102 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002103 self.assertEqual(b"ef", read_func(bufio, 2))
2104 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002105 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002106 self.assertEqual(6, bufio.tell())
2107 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002108 raw.seek(0, 0)
2109 raw.write(b"XYZ")
2110 # flush() resets the read buffer
2111 bufio.flush()
2112 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002113 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002114
2115 def test_flush_and_read(self):
2116 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2117
2118 def test_flush_and_readinto(self):
2119 def _readinto(bufio, n=-1):
2120 b = bytearray(n if n >= 0 else 9999)
2121 n = bufio.readinto(b)
2122 return bytes(b[:n])
2123 self.check_flush_and_read(_readinto)
2124
2125 def test_flush_and_peek(self):
2126 def _peek(bufio, n=-1):
2127 # This relies on the fact that the buffer can contain the whole
2128 # raw stream, otherwise peek() can return less.
2129 b = bufio.peek(n)
2130 if n != -1:
2131 b = b[:n]
2132 bufio.seek(len(b), 1)
2133 return b
2134 self.check_flush_and_read(_peek)
2135
2136 def test_flush_and_write(self):
2137 raw = self.BytesIO(b"abcdefghi")
2138 bufio = self.tp(raw)
2139
2140 bufio.write(b"123")
2141 bufio.flush()
2142 bufio.write(b"45")
2143 bufio.flush()
2144 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002145 self.assertEqual(b"12345fghi", raw.getvalue())
2146 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002147
2148 def test_threads(self):
2149 BufferedReaderTest.test_threads(self)
2150 BufferedWriterTest.test_threads(self)
2151
2152 def test_writes_and_peek(self):
2153 def _peek(bufio):
2154 bufio.peek(1)
2155 self.check_writes(_peek)
2156 def _peek(bufio):
2157 pos = bufio.tell()
2158 bufio.seek(-1, 1)
2159 bufio.peek(1)
2160 bufio.seek(pos, 0)
2161 self.check_writes(_peek)
2162
2163 def test_writes_and_reads(self):
2164 def _read(bufio):
2165 bufio.seek(-1, 1)
2166 bufio.read(1)
2167 self.check_writes(_read)
2168
2169 def test_writes_and_read1s(self):
2170 def _read1(bufio):
2171 bufio.seek(-1, 1)
2172 bufio.read1(1)
2173 self.check_writes(_read1)
2174
2175 def test_writes_and_readintos(self):
2176 def _read(bufio):
2177 bufio.seek(-1, 1)
2178 bufio.readinto(bytearray(1))
2179 self.check_writes(_read)
2180
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002181 def test_write_after_readahead(self):
2182 # Issue #6629: writing after the buffer was filled by readahead should
2183 # first rewind the raw stream.
2184 for overwrite_size in [1, 5]:
2185 raw = self.BytesIO(b"A" * 10)
2186 bufio = self.tp(raw, 4)
2187 # Trigger readahead
2188 self.assertEqual(bufio.read(1), b"A")
2189 self.assertEqual(bufio.tell(), 1)
2190 # Overwriting should rewind the raw stream if it needs so
2191 bufio.write(b"B" * overwrite_size)
2192 self.assertEqual(bufio.tell(), overwrite_size + 1)
2193 # If the write size was smaller than the buffer size, flush() and
2194 # check that rewind happens.
2195 bufio.flush()
2196 self.assertEqual(bufio.tell(), overwrite_size + 1)
2197 s = raw.getvalue()
2198 self.assertEqual(s,
2199 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2200
Antoine Pitrou7c404892011-05-13 00:13:33 +02002201 def test_write_rewind_write(self):
2202 # Various combinations of reading / writing / seeking backwards / writing again
2203 def mutate(bufio, pos1, pos2):
2204 assert pos2 >= pos1
2205 # Fill the buffer
2206 bufio.seek(pos1)
2207 bufio.read(pos2 - pos1)
2208 bufio.write(b'\x02')
2209 # This writes earlier than the previous write, but still inside
2210 # the buffer.
2211 bufio.seek(pos1)
2212 bufio.write(b'\x01')
2213
2214 b = b"\x80\x81\x82\x83\x84"
2215 for i in range(0, len(b)):
2216 for j in range(i, len(b)):
2217 raw = self.BytesIO(b)
2218 bufio = self.tp(raw, 100)
2219 mutate(bufio, i, j)
2220 bufio.flush()
2221 expected = bytearray(b)
2222 expected[j] = 2
2223 expected[i] = 1
2224 self.assertEqual(raw.getvalue(), expected,
2225 "failed result for i=%d, j=%d" % (i, j))
2226
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002227 def test_truncate_after_read_or_write(self):
2228 raw = self.BytesIO(b"A" * 10)
2229 bufio = self.tp(raw, 100)
2230 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2231 self.assertEqual(bufio.truncate(), 2)
2232 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2233 self.assertEqual(bufio.truncate(), 4)
2234
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002235 def test_misbehaved_io(self):
2236 BufferedReaderTest.test_misbehaved_io(self)
2237 BufferedWriterTest.test_misbehaved_io(self)
2238
Antoine Pitroue05565e2011-08-20 14:39:23 +02002239 def test_interleaved_read_write(self):
2240 # Test for issue #12213
2241 with self.BytesIO(b'abcdefgh') as raw:
2242 with self.tp(raw, 100) as f:
2243 f.write(b"1")
2244 self.assertEqual(f.read(1), b'b')
2245 f.write(b'2')
2246 self.assertEqual(f.read1(1), b'd')
2247 f.write(b'3')
2248 buf = bytearray(1)
2249 f.readinto(buf)
2250 self.assertEqual(buf, b'f')
2251 f.write(b'4')
2252 self.assertEqual(f.peek(1), b'h')
2253 f.flush()
2254 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2255
2256 with self.BytesIO(b'abc') as raw:
2257 with self.tp(raw, 100) as f:
2258 self.assertEqual(f.read(1), b'a')
2259 f.write(b"2")
2260 self.assertEqual(f.read(1), b'c')
2261 f.flush()
2262 self.assertEqual(raw.getvalue(), b'a2c')
2263
2264 def test_interleaved_readline_write(self):
2265 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2266 with self.tp(raw) as f:
2267 f.write(b'1')
2268 self.assertEqual(f.readline(), b'b\n')
2269 f.write(b'2')
2270 self.assertEqual(f.readline(), b'def\n')
2271 f.write(b'3')
2272 self.assertEqual(f.readline(), b'\n')
2273 f.flush()
2274 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2275
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002276 # You can't construct a BufferedRandom over a non-seekable stream.
2277 test_unseekable = None
2278
R David Murray67bfe802013-02-23 21:51:05 -05002279
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002280class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002281 tp = io.BufferedRandom
2282
2283 def test_constructor(self):
2284 BufferedRandomTest.test_constructor(self)
2285 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08002286 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002287 if sys.maxsize > 0x7FFFFFFF:
2288 rawio = self.MockRawIO()
2289 bufio = self.tp(rawio)
2290 self.assertRaises((OverflowError, MemoryError, ValueError),
2291 bufio.__init__, rawio, sys.maxsize)
2292
2293 def test_garbage_collection(self):
2294 CBufferedReaderTest.test_garbage_collection(self)
2295 CBufferedWriterTest.test_garbage_collection(self)
2296
R David Murray67bfe802013-02-23 21:51:05 -05002297 def test_args_error(self):
2298 # Issue #17275
2299 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2300 self.tp(io.BytesIO(), 1024, 1024, 1024)
2301
2302
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002303class PyBufferedRandomTest(BufferedRandomTest):
2304 tp = pyio.BufferedRandom
2305
2306
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002307# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2308# properties:
2309# - A single output character can correspond to many bytes of input.
2310# - The number of input bytes to complete the character can be
2311# undetermined until the last input byte is received.
2312# - The number of input bytes can vary depending on previous input.
2313# - A single input byte can correspond to many characters of output.
2314# - The number of output characters can be undetermined until the
2315# last input byte is received.
2316# - The number of output characters can vary depending on previous input.
2317
2318class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2319 """
2320 For testing seek/tell behavior with a stateful, buffering decoder.
2321
2322 Input is a sequence of words. Words may be fixed-length (length set
2323 by input) or variable-length (period-terminated). In variable-length
2324 mode, extra periods are ignored. Possible words are:
2325 - 'i' followed by a number sets the input length, I (maximum 99).
2326 When I is set to 0, words are space-terminated.
2327 - 'o' followed by a number sets the output length, O (maximum 99).
2328 - Any other word is converted into a word followed by a period on
2329 the output. The output word consists of the input word truncated
2330 or padded out with hyphens to make its length equal to O. If O
2331 is 0, the word is output verbatim without truncating or padding.
2332 I and O are initially set to 1. When I changes, any buffered input is
2333 re-scanned according to the new I. EOF also terminates the last word.
2334 """
2335
2336 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002337 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002338 self.reset()
2339
2340 def __repr__(self):
2341 return '<SID %x>' % id(self)
2342
2343 def reset(self):
2344 self.i = 1
2345 self.o = 1
2346 self.buffer = bytearray()
2347
2348 def getstate(self):
2349 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2350 return bytes(self.buffer), i*100 + o
2351
2352 def setstate(self, state):
2353 buffer, io = state
2354 self.buffer = bytearray(buffer)
2355 i, o = divmod(io, 100)
2356 self.i, self.o = i ^ 1, o ^ 1
2357
2358 def decode(self, input, final=False):
2359 output = ''
2360 for b in input:
2361 if self.i == 0: # variable-length, terminated with period
2362 if b == ord('.'):
2363 if self.buffer:
2364 output += self.process_word()
2365 else:
2366 self.buffer.append(b)
2367 else: # fixed-length, terminate after self.i bytes
2368 self.buffer.append(b)
2369 if len(self.buffer) == self.i:
2370 output += self.process_word()
2371 if final and self.buffer: # EOF terminates the last word
2372 output += self.process_word()
2373 return output
2374
2375 def process_word(self):
2376 output = ''
2377 if self.buffer[0] == ord('i'):
2378 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2379 elif self.buffer[0] == ord('o'):
2380 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2381 else:
2382 output = self.buffer.decode('ascii')
2383 if len(output) < self.o:
2384 output += '-'*self.o # pad out with hyphens
2385 if self.o:
2386 output = output[:self.o] # truncate to output length
2387 output += '.'
2388 self.buffer = bytearray()
2389 return output
2390
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002391 codecEnabled = False
2392
2393 @classmethod
2394 def lookupTestDecoder(cls, name):
2395 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002396 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002397 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002398 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002399 incrementalencoder=None,
2400 streamreader=None, streamwriter=None,
2401 incrementaldecoder=cls)
2402
2403# Register the previous decoder for testing.
2404# Disabled by default, tests will enable it.
2405codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2406
2407
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002408class StatefulIncrementalDecoderTest(unittest.TestCase):
2409 """
2410 Make sure the StatefulIncrementalDecoder actually works.
2411 """
2412
2413 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002414 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002415 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002416 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002417 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002418 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002419 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002420 # I=0, O=6 (variable-length input, fixed-length output)
2421 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2422 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002423 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002424 # I=6, O=3 (fixed-length input > fixed-length output)
2425 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2426 # I=0, then 3; O=29, then 15 (with longer output)
2427 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2428 'a----------------------------.' +
2429 'b----------------------------.' +
2430 'cde--------------------------.' +
2431 'abcdefghijabcde.' +
2432 'a.b------------.' +
2433 '.c.------------.' +
2434 'd.e------------.' +
2435 'k--------------.' +
2436 'l--------------.' +
2437 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002438 ]
2439
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002440 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002441 # Try a few one-shot test cases.
2442 for input, eof, output in self.test_cases:
2443 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002444 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002445
2446 # Also test an unfinished decode, followed by forcing EOF.
2447 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002448 self.assertEqual(d.decode(b'oiabcd'), '')
2449 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002450
2451class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002452
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002453 def setUp(self):
2454 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2455 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002456 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002457
Guido van Rossumd0712812007-04-11 16:32:43 +00002458 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002459 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002460
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002461 def test_constructor(self):
2462 r = self.BytesIO(b"\xc3\xa9\n\n")
2463 b = self.BufferedReader(r, 1000)
2464 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002465 t.__init__(b, encoding="latin-1", newline="\r\n")
2466 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002467 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002468 t.__init__(b, encoding="utf-8", line_buffering=True)
2469 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002470 self.assertEqual(t.line_buffering, True)
2471 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002472 self.assertRaises(TypeError, t.__init__, b, newline=42)
2473 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2474
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002475 def test_uninitialized(self):
2476 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2477 del t
2478 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2479 self.assertRaises(Exception, repr, t)
2480 self.assertRaisesRegex((ValueError, AttributeError),
2481 'uninitialized|has no attribute',
2482 t.read, 0)
2483 t.__init__(self.MockRawIO())
2484 self.assertEqual(t.read(0), '')
2485
Nick Coghlana9b15242014-02-04 22:11:18 +10002486 def test_non_text_encoding_codecs_are_rejected(self):
2487 # Ensure the constructor complains if passed a codec that isn't
2488 # marked as a text encoding
2489 # http://bugs.python.org/issue20404
2490 r = self.BytesIO()
2491 b = self.BufferedWriter(r)
2492 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2493 self.TextIOWrapper(b, encoding="hex")
2494
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002495 def test_detach(self):
2496 r = self.BytesIO()
2497 b = self.BufferedWriter(r)
2498 t = self.TextIOWrapper(b)
2499 self.assertIs(t.detach(), b)
2500
2501 t = self.TextIOWrapper(b, encoding="ascii")
2502 t.write("howdy")
2503 self.assertFalse(r.getvalue())
2504 t.detach()
2505 self.assertEqual(r.getvalue(), b"howdy")
2506 self.assertRaises(ValueError, t.detach)
2507
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002508 # Operations independent of the detached stream should still work
2509 repr(t)
2510 self.assertEqual(t.encoding, "ascii")
2511 self.assertEqual(t.errors, "strict")
2512 self.assertFalse(t.line_buffering)
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002513 self.assertFalse(t.write_through)
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002514
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002515 def test_repr(self):
2516 raw = self.BytesIO("hello".encode("utf-8"))
2517 b = self.BufferedReader(raw)
2518 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002519 modname = self.TextIOWrapper.__module__
2520 self.assertEqual(repr(t),
2521 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2522 raw.name = "dummy"
2523 self.assertEqual(repr(t),
2524 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002525 t.mode = "r"
2526 self.assertEqual(repr(t),
2527 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002528 raw.name = b"dummy"
2529 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002530 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002531
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002532 t.buffer.detach()
2533 repr(t) # Should not raise an exception
2534
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002535 def test_recursive_repr(self):
2536 # Issue #25455
2537 raw = self.BytesIO()
2538 t = self.TextIOWrapper(raw)
2539 with support.swap_attr(raw, 'name', t):
2540 try:
2541 repr(t) # Should not crash
2542 except RuntimeError:
2543 pass
2544
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002545 def test_line_buffering(self):
2546 r = self.BytesIO()
2547 b = self.BufferedWriter(r, 1000)
2548 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002549 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002550 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002551 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002552 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002553 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002554 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002555
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002556 def test_reconfigure_line_buffering(self):
2557 r = self.BytesIO()
2558 b = self.BufferedWriter(r, 1000)
2559 t = self.TextIOWrapper(b, newline="\n", line_buffering=False)
2560 t.write("AB\nC")
2561 self.assertEqual(r.getvalue(), b"")
2562
2563 t.reconfigure(line_buffering=True) # implicit flush
2564 self.assertEqual(r.getvalue(), b"AB\nC")
2565 t.write("DEF\nG")
2566 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2567 t.write("H")
2568 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2569 t.reconfigure(line_buffering=False) # implicit flush
2570 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2571 t.write("IJ")
2572 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2573
2574 # Keeping default value
2575 t.reconfigure()
2576 t.reconfigure(line_buffering=None)
2577 self.assertEqual(t.line_buffering, False)
2578 t.reconfigure(line_buffering=True)
2579 t.reconfigure()
2580 t.reconfigure(line_buffering=None)
2581 self.assertEqual(t.line_buffering, True)
2582
Victor Stinner91106cd2017-12-13 12:29:09 +01002583 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002584 def test_default_encoding(self):
2585 old_environ = dict(os.environ)
2586 try:
2587 # try to get a user preferred encoding different than the current
2588 # locale encoding to check that TextIOWrapper() uses the current
2589 # locale encoding and not the user preferred encoding
2590 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2591 if key in os.environ:
2592 del os.environ[key]
2593
2594 current_locale_encoding = locale.getpreferredencoding(False)
2595 b = self.BytesIO()
2596 t = self.TextIOWrapper(b)
2597 self.assertEqual(t.encoding, current_locale_encoding)
2598 finally:
2599 os.environ.clear()
2600 os.environ.update(old_environ)
2601
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002602 @support.cpython_only
Victor Stinner91106cd2017-12-13 12:29:09 +01002603 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Serhiy Storchaka78980432013-01-15 01:12:17 +02002604 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002605 # Issue 15989
2606 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002607 b = self.BytesIO()
2608 b.fileno = lambda: _testcapi.INT_MAX + 1
2609 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2610 b.fileno = lambda: _testcapi.UINT_MAX + 1
2611 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2612
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002613 def test_encoding(self):
2614 # Check the encoding attribute is always set, and valid
2615 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002616 t = self.TextIOWrapper(b, encoding="utf-8")
2617 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002618 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002619 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002620 codecs.lookup(t.encoding)
2621
2622 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002623 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002624 b = self.BytesIO(b"abc\n\xff\n")
2625 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002626 self.assertRaises(UnicodeError, t.read)
2627 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002628 b = self.BytesIO(b"abc\n\xff\n")
2629 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002630 self.assertRaises(UnicodeError, t.read)
2631 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002632 b = self.BytesIO(b"abc\n\xff\n")
2633 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002634 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002635 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002636 b = self.BytesIO(b"abc\n\xff\n")
2637 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002638 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002639
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002640 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002641 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002642 b = self.BytesIO()
2643 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002644 self.assertRaises(UnicodeError, t.write, "\xff")
2645 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002646 b = self.BytesIO()
2647 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002648 self.assertRaises(UnicodeError, t.write, "\xff")
2649 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002650 b = self.BytesIO()
2651 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002652 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002653 t.write("abc\xffdef\n")
2654 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002655 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002656 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002657 b = self.BytesIO()
2658 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002659 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002660 t.write("abc\xffdef\n")
2661 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002662 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002663
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002664 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002665 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2666
2667 tests = [
2668 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002669 [ '', input_lines ],
2670 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2671 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2672 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002673 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002674 encodings = (
2675 'utf-8', 'latin-1',
2676 'utf-16', 'utf-16-le', 'utf-16-be',
2677 'utf-32', 'utf-32-le', 'utf-32-be',
2678 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002679
Guido van Rossum8358db22007-08-18 21:39:55 +00002680 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002681 # character in TextIOWrapper._pending_line.
2682 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002683 # XXX: str.encode() should return bytes
2684 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002685 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002686 for bufsize in range(1, 10):
2687 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002688 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2689 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002690 encoding=encoding)
2691 if do_reads:
2692 got_lines = []
2693 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002694 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002695 if c2 == '':
2696 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002697 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002698 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002699 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002700 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002701
2702 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002703 self.assertEqual(got_line, exp_line)
2704 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002705
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002706 def test_newlines_input(self):
2707 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002708 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2709 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002710 (None, normalized.decode("ascii").splitlines(keepends=True)),
2711 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002712 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2713 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2714 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002715 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002716 buf = self.BytesIO(testdata)
2717 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002718 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002719 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002720 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002721
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002722 def test_newlines_output(self):
2723 testdict = {
2724 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2725 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2726 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2727 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2728 }
2729 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2730 for newline, expected in tests:
2731 buf = self.BytesIO()
2732 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2733 txt.write("AAA\nB")
2734 txt.write("BB\nCCC\n")
2735 txt.write("X\rY\r\nZ")
2736 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002737 self.assertEqual(buf.closed, False)
2738 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002739
2740 def test_destructor(self):
2741 l = []
2742 base = self.BytesIO
2743 class MyBytesIO(base):
2744 def close(self):
2745 l.append(self.getvalue())
2746 base.close(self)
2747 b = MyBytesIO()
2748 t = self.TextIOWrapper(b, encoding="ascii")
2749 t.write("abc")
2750 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002751 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002752 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002753
2754 def test_override_destructor(self):
2755 record = []
2756 class MyTextIO(self.TextIOWrapper):
2757 def __del__(self):
2758 record.append(1)
2759 try:
2760 f = super().__del__
2761 except AttributeError:
2762 pass
2763 else:
2764 f()
2765 def close(self):
2766 record.append(2)
2767 super().close()
2768 def flush(self):
2769 record.append(3)
2770 super().flush()
2771 b = self.BytesIO()
2772 t = MyTextIO(b, encoding="ascii")
2773 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002774 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002775 self.assertEqual(record, [1, 2, 3])
2776
2777 def test_error_through_destructor(self):
2778 # Test that the exception state is not modified by a destructor,
2779 # even if close() fails.
2780 rawio = self.CloseFailureIO()
2781 def f():
2782 self.TextIOWrapper(rawio).xyzzy
2783 with support.captured_output("stderr") as s:
2784 self.assertRaises(AttributeError, f)
2785 s = s.getvalue().strip()
2786 if s:
2787 # The destructor *may* have printed an unraisable error, check it
2788 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002789 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002790 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002791
Guido van Rossum9b76da62007-04-11 01:09:03 +00002792 # Systematic tests of the text I/O API
2793
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002794 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002795 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002796 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002797 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002798 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002799 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002800 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002801 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002802 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002803 self.assertEqual(f.tell(), 0)
2804 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002805 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002806 self.assertEqual(f.seek(0), 0)
2807 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002808 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002809 self.assertEqual(f.read(2), "ab")
2810 self.assertEqual(f.read(1), "c")
2811 self.assertEqual(f.read(1), "")
2812 self.assertEqual(f.read(), "")
2813 self.assertEqual(f.tell(), cookie)
2814 self.assertEqual(f.seek(0), 0)
2815 self.assertEqual(f.seek(0, 2), cookie)
2816 self.assertEqual(f.write("def"), 3)
2817 self.assertEqual(f.seek(cookie), cookie)
2818 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002819 if enc.startswith("utf"):
2820 self.multi_line_test(f, enc)
2821 f.close()
2822
2823 def multi_line_test(self, f, enc):
2824 f.seek(0)
2825 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002826 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002827 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002828 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002829 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002830 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002831 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002832 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002833 wlines.append((f.tell(), line))
2834 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002835 f.seek(0)
2836 rlines = []
2837 while True:
2838 pos = f.tell()
2839 line = f.readline()
2840 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002841 break
2842 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002843 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002844
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002845 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002846 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002847 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002848 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002849 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002850 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002851 p2 = f.tell()
2852 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002853 self.assertEqual(f.tell(), p0)
2854 self.assertEqual(f.readline(), "\xff\n")
2855 self.assertEqual(f.tell(), p1)
2856 self.assertEqual(f.readline(), "\xff\n")
2857 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002858 f.seek(0)
2859 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002860 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002861 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002862 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002863 f.close()
2864
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002865 def test_seeking(self):
2866 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002867 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002868 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002869 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002870 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002871 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002872 suffix = bytes(u_suffix.encode("utf-8"))
2873 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002874 with self.open(support.TESTFN, "wb") as f:
2875 f.write(line*2)
2876 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2877 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002878 self.assertEqual(s, str(prefix, "ascii"))
2879 self.assertEqual(f.tell(), prefix_size)
2880 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002881
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002882 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002883 # Regression test for a specific bug
2884 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002885 with self.open(support.TESTFN, "wb") as f:
2886 f.write(data)
2887 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2888 f._CHUNK_SIZE # Just test that it exists
2889 f._CHUNK_SIZE = 2
2890 f.readline()
2891 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002892
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002893 def test_seek_and_tell(self):
2894 #Test seek/tell using the StatefulIncrementalDecoder.
2895 # Make test faster by doing smaller seeks
2896 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002897
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002898 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002899 """Tell/seek to various points within a data stream and ensure
2900 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002901 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002902 f.write(data)
2903 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002904 f = self.open(support.TESTFN, encoding='test_decoder')
2905 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002906 decoded = f.read()
2907 f.close()
2908
Neal Norwitze2b07052008-03-18 19:52:05 +00002909 for i in range(min_pos, len(decoded) + 1): # seek positions
2910 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002911 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002912 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002913 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002914 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002915 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002916 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002917 f.close()
2918
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002919 # Enable the test decoder.
2920 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002921
2922 # Run the tests.
2923 try:
2924 # Try each test case.
2925 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002926 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002927
2928 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002929 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2930 offset = CHUNK_SIZE - len(input)//2
2931 prefix = b'.'*offset
2932 # Don't bother seeking into the prefix (takes too long).
2933 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002934 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002935
2936 # Ensure our test decoder won't interfere with subsequent tests.
2937 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002938 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002939
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002940 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002941 data = "1234567890"
2942 tests = ("utf-16",
2943 "utf-16-le",
2944 "utf-16-be",
2945 "utf-32",
2946 "utf-32-le",
2947 "utf-32-be")
2948 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002949 buf = self.BytesIO()
2950 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002951 # Check if the BOM is written only once (see issue1753).
2952 f.write(data)
2953 f.write(data)
2954 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002955 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002956 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002957 self.assertEqual(f.read(), data * 2)
2958 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002959
Benjamin Petersona1b49012009-03-31 23:11:32 +00002960 def test_unreadable(self):
2961 class UnReadable(self.BytesIO):
2962 def readable(self):
2963 return False
2964 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002965 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002966
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002967 def test_read_one_by_one(self):
2968 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002969 reads = ""
2970 while True:
2971 c = txt.read(1)
2972 if not c:
2973 break
2974 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002975 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002976
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002977 def test_readlines(self):
2978 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2979 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2980 txt.seek(0)
2981 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2982 txt.seek(0)
2983 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2984
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002985 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002986 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002987 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002988 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002989 reads = ""
2990 while True:
2991 c = txt.read(128)
2992 if not c:
2993 break
2994 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002995 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002996
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002997 def test_writelines(self):
2998 l = ['ab', 'cd', 'ef']
2999 buf = self.BytesIO()
3000 txt = self.TextIOWrapper(buf)
3001 txt.writelines(l)
3002 txt.flush()
3003 self.assertEqual(buf.getvalue(), b'abcdef')
3004
3005 def test_writelines_userlist(self):
3006 l = UserList(['ab', 'cd', 'ef'])
3007 buf = self.BytesIO()
3008 txt = self.TextIOWrapper(buf)
3009 txt.writelines(l)
3010 txt.flush()
3011 self.assertEqual(buf.getvalue(), b'abcdef')
3012
3013 def test_writelines_error(self):
3014 txt = self.TextIOWrapper(self.BytesIO())
3015 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3016 self.assertRaises(TypeError, txt.writelines, None)
3017 self.assertRaises(TypeError, txt.writelines, b'abc')
3018
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003019 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003020 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003021
3022 # read one char at a time
3023 reads = ""
3024 while True:
3025 c = txt.read(1)
3026 if not c:
3027 break
3028 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003029 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003030
3031 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003032 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003033 txt._CHUNK_SIZE = 4
3034
3035 reads = ""
3036 while True:
3037 c = txt.read(4)
3038 if not c:
3039 break
3040 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003041 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003042
3043 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003044 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003045 txt._CHUNK_SIZE = 4
3046
3047 reads = txt.read(4)
3048 reads += txt.read(4)
3049 reads += txt.readline()
3050 reads += txt.readline()
3051 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003052 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003053
3054 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003055 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003056 txt._CHUNK_SIZE = 4
3057
3058 reads = txt.read(4)
3059 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003060 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003061
3062 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003063 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003064 txt._CHUNK_SIZE = 4
3065
3066 reads = txt.read(4)
3067 pos = txt.tell()
3068 txt.seek(0)
3069 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003070 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003071
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003072 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003073 buffer = self.BytesIO(self.testdata)
3074 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003075
3076 self.assertEqual(buffer.seekable(), txt.seekable())
3077
Antoine Pitroue4501852009-05-14 18:55:55 +00003078 def test_append_bom(self):
3079 # The BOM is not written again when appending to a non-empty file
3080 filename = support.TESTFN
3081 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3082 with self.open(filename, 'w', encoding=charset) as f:
3083 f.write('aaa')
3084 pos = f.tell()
3085 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003086 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003087
3088 with self.open(filename, 'a', encoding=charset) as f:
3089 f.write('xxx')
3090 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003091 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003092
3093 def test_seek_bom(self):
3094 # Same test, but when seeking manually
3095 filename = support.TESTFN
3096 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3097 with self.open(filename, 'w', encoding=charset) as f:
3098 f.write('aaa')
3099 pos = f.tell()
3100 with self.open(filename, 'r+', encoding=charset) as f:
3101 f.seek(pos)
3102 f.write('zzz')
3103 f.seek(0)
3104 f.write('bbb')
3105 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003106 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003107
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003108 def test_seek_append_bom(self):
3109 # Same test, but first seek to the start and then to the end
3110 filename = support.TESTFN
3111 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3112 with self.open(filename, 'w', encoding=charset) as f:
3113 f.write('aaa')
3114 with self.open(filename, 'a', encoding=charset) as f:
3115 f.seek(0)
3116 f.seek(0, self.SEEK_END)
3117 f.write('xxx')
3118 with self.open(filename, 'rb') as f:
3119 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3120
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003121 def test_errors_property(self):
3122 with self.open(support.TESTFN, "w") as f:
3123 self.assertEqual(f.errors, "strict")
3124 with self.open(support.TESTFN, "w", errors="replace") as f:
3125 self.assertEqual(f.errors, "replace")
3126
Brett Cannon31f59292011-02-21 19:29:56 +00003127 @support.no_tracing
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003128 def test_threads_write(self):
3129 # Issue6750: concurrent writes could duplicate data
3130 event = threading.Event()
3131 with self.open(support.TESTFN, "w", buffering=1) as f:
3132 def run(n):
3133 text = "Thread%03d\n" % n
3134 event.wait()
3135 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003136 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003137 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003138 with support.start_threads(threads, event.set):
3139 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003140 with self.open(support.TESTFN) as f:
3141 content = f.read()
3142 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003143 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003144
Antoine Pitrou6be88762010-05-03 16:48:20 +00003145 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003146 # Test that text file is closed despite failed flush
3147 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003148 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003149 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003150 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003151 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003152 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003153 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003154 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003155 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003156 self.assertTrue(txt.buffer.closed)
3157 self.assertTrue(closed) # flush() called
3158 self.assertFalse(closed[0]) # flush() called before file closed
3159 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003160 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003161
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003162 def test_close_error_on_close(self):
3163 buffer = self.BytesIO(self.testdata)
3164 def bad_flush():
3165 raise OSError('flush')
3166 def bad_close():
3167 raise OSError('close')
3168 buffer.close = bad_close
3169 txt = self.TextIOWrapper(buffer, encoding="ascii")
3170 txt.flush = bad_flush
3171 with self.assertRaises(OSError) as err: # exception not swallowed
3172 txt.close()
3173 self.assertEqual(err.exception.args, ('close',))
3174 self.assertIsInstance(err.exception.__context__, OSError)
3175 self.assertEqual(err.exception.__context__.args, ('flush',))
3176 self.assertFalse(txt.closed)
3177
3178 def test_nonnormalized_close_error_on_close(self):
3179 # Issue #21677
3180 buffer = self.BytesIO(self.testdata)
3181 def bad_flush():
3182 raise non_existing_flush
3183 def bad_close():
3184 raise non_existing_close
3185 buffer.close = bad_close
3186 txt = self.TextIOWrapper(buffer, encoding="ascii")
3187 txt.flush = bad_flush
3188 with self.assertRaises(NameError) as err: # exception not swallowed
3189 txt.close()
3190 self.assertIn('non_existing_close', str(err.exception))
3191 self.assertIsInstance(err.exception.__context__, NameError)
3192 self.assertIn('non_existing_flush', str(err.exception.__context__))
3193 self.assertFalse(txt.closed)
3194
Antoine Pitrou6be88762010-05-03 16:48:20 +00003195 def test_multi_close(self):
3196 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3197 txt.close()
3198 txt.close()
3199 txt.close()
3200 self.assertRaises(ValueError, txt.flush)
3201
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003202 def test_unseekable(self):
3203 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3204 self.assertRaises(self.UnsupportedOperation, txt.tell)
3205 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3206
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003207 def test_readonly_attributes(self):
3208 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3209 buf = self.BytesIO(self.testdata)
3210 with self.assertRaises(AttributeError):
3211 txt.buffer = buf
3212
Antoine Pitroue96ec682011-07-23 21:46:35 +02003213 def test_rawio(self):
3214 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3215 # that subprocess.Popen() can have the required unbuffered
3216 # semantics with universal_newlines=True.
3217 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3218 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3219 # Reads
3220 self.assertEqual(txt.read(4), 'abcd')
3221 self.assertEqual(txt.readline(), 'efghi\n')
3222 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3223
3224 def test_rawio_write_through(self):
3225 # Issue #12591: with write_through=True, writes don't need a flush
3226 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3227 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3228 write_through=True)
3229 txt.write('1')
3230 txt.write('23\n4')
3231 txt.write('5')
3232 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3233
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003234 def test_bufio_write_through(self):
3235 # Issue #21396: write_through=True doesn't force a flush()
3236 # on the underlying binary buffered object.
3237 flush_called, write_called = [], []
3238 class BufferedWriter(self.BufferedWriter):
3239 def flush(self, *args, **kwargs):
3240 flush_called.append(True)
3241 return super().flush(*args, **kwargs)
3242 def write(self, *args, **kwargs):
3243 write_called.append(True)
3244 return super().write(*args, **kwargs)
3245
3246 rawio = self.BytesIO()
3247 data = b"a"
3248 bufio = BufferedWriter(rawio, len(data)*2)
3249 textio = self.TextIOWrapper(bufio, encoding='ascii',
3250 write_through=True)
3251 # write to the buffered io but don't overflow the buffer
3252 text = data.decode('ascii')
3253 textio.write(text)
3254
3255 # buffer.flush is not called with write_through=True
3256 self.assertFalse(flush_called)
3257 # buffer.write *is* called with write_through=True
3258 self.assertTrue(write_called)
3259 self.assertEqual(rawio.getvalue(), b"") # no flush
3260
3261 write_called = [] # reset
3262 textio.write(text * 10) # total content is larger than bufio buffer
3263 self.assertTrue(write_called)
3264 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3265
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02003266 def test_reconfigure_write_through(self):
3267 raw = self.MockRawIO([])
3268 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3269 t.write('1')
3270 t.reconfigure(write_through=True) # implied flush
3271 self.assertEqual(t.write_through, True)
3272 self.assertEqual(b''.join(raw._write_stack), b'1')
3273 t.write('23')
3274 self.assertEqual(b''.join(raw._write_stack), b'123')
3275 t.reconfigure(write_through=False)
3276 self.assertEqual(t.write_through, False)
3277 t.write('45')
3278 t.flush()
3279 self.assertEqual(b''.join(raw._write_stack), b'12345')
3280 # Keeping default value
3281 t.reconfigure()
3282 t.reconfigure(write_through=None)
3283 self.assertEqual(t.write_through, False)
3284 t.reconfigure(write_through=True)
3285 t.reconfigure()
3286 t.reconfigure(write_through=None)
3287 self.assertEqual(t.write_through, True)
3288
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003289 def test_read_nonbytes(self):
3290 # Issue #17106
3291 # Crash when underlying read() returns non-bytes
3292 t = self.TextIOWrapper(self.StringIO('a'))
3293 self.assertRaises(TypeError, t.read, 1)
3294 t = self.TextIOWrapper(self.StringIO('a'))
3295 self.assertRaises(TypeError, t.readline)
3296 t = self.TextIOWrapper(self.StringIO('a'))
3297 self.assertRaises(TypeError, t.read)
3298
Oren Milmana5b4ea12017-08-25 21:14:54 +03003299 def test_illegal_encoder(self):
3300 # Issue 31271: Calling write() while the return value of encoder's
3301 # encode() is invalid shouldn't cause an assertion failure.
3302 rot13 = codecs.lookup("rot13")
3303 with support.swap_attr(rot13, '_is_text_encoding', True):
3304 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3305 self.assertRaises(TypeError, t.write, 'bar')
3306
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003307 def test_illegal_decoder(self):
3308 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003309 # Bypass the early encoding check added in issue 20404
3310 def _make_illegal_wrapper():
3311 quopri = codecs.lookup("quopri")
3312 quopri._is_text_encoding = True
3313 try:
3314 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3315 newline='\n', encoding="quopri")
3316 finally:
3317 quopri._is_text_encoding = False
3318 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003319 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003320 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003321 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003322 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003323 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003324 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003325 self.assertRaises(TypeError, t.read)
3326
Oren Milmanba7d7362017-08-29 11:58:27 +03003327 # Issue 31243: calling read() while the return value of decoder's
3328 # getstate() is invalid should neither crash the interpreter nor
3329 # raise a SystemError.
3330 def _make_very_illegal_wrapper(getstate_ret_val):
3331 class BadDecoder:
3332 def getstate(self):
3333 return getstate_ret_val
3334 def _get_bad_decoder(dummy):
3335 return BadDecoder()
3336 quopri = codecs.lookup("quopri")
3337 with support.swap_attr(quopri, 'incrementaldecoder',
3338 _get_bad_decoder):
3339 return _make_illegal_wrapper()
3340 t = _make_very_illegal_wrapper(42)
3341 self.assertRaises(TypeError, t.read, 42)
3342 t = _make_very_illegal_wrapper(())
3343 self.assertRaises(TypeError, t.read, 42)
3344 t = _make_very_illegal_wrapper((1, 2))
3345 self.assertRaises(TypeError, t.read, 42)
3346
Antoine Pitrou712cb732013-12-21 15:51:54 +01003347 def _check_create_at_shutdown(self, **kwargs):
3348 # Issue #20037: creating a TextIOWrapper at shutdown
3349 # shouldn't crash the interpreter.
3350 iomod = self.io.__name__
3351 code = """if 1:
3352 import codecs
3353 import {iomod} as io
3354
3355 # Avoid looking up codecs at shutdown
3356 codecs.lookup('utf-8')
3357
3358 class C:
3359 def __init__(self):
3360 self.buf = io.BytesIO()
3361 def __del__(self):
3362 io.TextIOWrapper(self.buf, **{kwargs})
3363 print("ok")
3364 c = C()
3365 """.format(iomod=iomod, kwargs=kwargs)
3366 return assert_python_ok("-c", code)
3367
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003368 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003369 def test_create_at_shutdown_without_encoding(self):
3370 rc, out, err = self._check_create_at_shutdown()
3371 if err:
3372 # Can error out with a RuntimeError if the module state
3373 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003374 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003375 else:
3376 self.assertEqual("ok", out.decode().strip())
3377
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003378 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003379 def test_create_at_shutdown_with_encoding(self):
3380 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3381 errors='strict')
3382 self.assertFalse(err)
3383 self.assertEqual("ok", out.decode().strip())
3384
Antoine Pitroub8503892014-04-29 10:14:02 +02003385 def test_read_byteslike(self):
3386 r = MemviewBytesIO(b'Just some random string\n')
3387 t = self.TextIOWrapper(r, 'utf-8')
3388
3389 # TextIOwrapper will not read the full string, because
3390 # we truncate it to a multiple of the native int size
3391 # so that we can construct a more complex memoryview.
3392 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3393
3394 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3395
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003396 def test_issue22849(self):
3397 class F(object):
3398 def readable(self): return True
3399 def writable(self): return True
3400 def seekable(self): return True
3401
3402 for i in range(10):
3403 try:
3404 self.TextIOWrapper(F(), encoding='utf-8')
3405 except Exception:
3406 pass
3407
3408 F.tell = lambda x: 0
3409 t = self.TextIOWrapper(F(), encoding='utf-8')
3410
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003411
Antoine Pitroub8503892014-04-29 10:14:02 +02003412class MemviewBytesIO(io.BytesIO):
3413 '''A BytesIO object whose read method returns memoryviews
3414 rather than bytes'''
3415
3416 def read1(self, len_):
3417 return _to_memoryview(super().read1(len_))
3418
3419 def read(self, len_):
3420 return _to_memoryview(super().read(len_))
3421
3422def _to_memoryview(buf):
3423 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3424
3425 arr = array.array('i')
3426 idx = len(buf) - len(buf) % arr.itemsize
3427 arr.frombytes(buf[:idx])
3428 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003429
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003430
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003431class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003432 io = io
Steve Dowerfe8f4c92016-09-07 09:31:52 -07003433 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003434
3435 def test_initialization(self):
3436 r = self.BytesIO(b"\xc3\xa9\n\n")
3437 b = self.BufferedReader(r, 1000)
3438 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003439 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3440 self.assertRaises(ValueError, t.read)
3441
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003442 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3443 self.assertRaises(Exception, repr, t)
3444
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003445 def test_garbage_collection(self):
3446 # C TextIOWrapper objects are collected, and collecting them flushes
3447 # all data to disk.
3448 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003449 with support.check_warnings(('', ResourceWarning)):
3450 rawio = io.FileIO(support.TESTFN, "wb")
3451 b = self.BufferedWriter(rawio)
3452 t = self.TextIOWrapper(b, encoding="ascii")
3453 t.write("456def")
3454 t.x = t
3455 wr = weakref.ref(t)
3456 del t
3457 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003458 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003459 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003460 self.assertEqual(f.read(), b"456def")
3461
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003462 def test_rwpair_cleared_before_textio(self):
3463 # Issue 13070: TextIOWrapper's finalization would crash when called
3464 # after the reference to the underlying BufferedRWPair's writer got
3465 # cleared by the GC.
3466 for i in range(1000):
3467 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3468 t1 = self.TextIOWrapper(b1, encoding="ascii")
3469 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3470 t2 = self.TextIOWrapper(b2, encoding="ascii")
3471 # circular references
3472 t1.buddy = t2
3473 t2.buddy = t1
3474 support.gc_collect()
3475
3476
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003477class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003478 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003479 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003480
3481
3482class IncrementalNewlineDecoderTest(unittest.TestCase):
3483
3484 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003485 # UTF-8 specific tests for a newline decoder
3486 def _check_decode(b, s, **kwargs):
3487 # We exercise getstate() / setstate() as well as decode()
3488 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003489 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003490 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003491 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003492
Antoine Pitrou180a3362008-12-14 16:36:46 +00003493 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003494
Antoine Pitrou180a3362008-12-14 16:36:46 +00003495 _check_decode(b'\xe8', "")
3496 _check_decode(b'\xa2', "")
3497 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003498
Antoine Pitrou180a3362008-12-14 16:36:46 +00003499 _check_decode(b'\xe8', "")
3500 _check_decode(b'\xa2', "")
3501 _check_decode(b'\x88', "\u8888")
3502
3503 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003504 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3505
Antoine Pitrou180a3362008-12-14 16:36:46 +00003506 decoder.reset()
3507 _check_decode(b'\n', "\n")
3508 _check_decode(b'\r', "")
3509 _check_decode(b'', "\n", final=True)
3510 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003511
Antoine Pitrou180a3362008-12-14 16:36:46 +00003512 _check_decode(b'\r', "")
3513 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003514
Antoine Pitrou180a3362008-12-14 16:36:46 +00003515 _check_decode(b'\r\r\n', "\n\n")
3516 _check_decode(b'\r', "")
3517 _check_decode(b'\r', "\n")
3518 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003519
Antoine Pitrou180a3362008-12-14 16:36:46 +00003520 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3521 _check_decode(b'\xe8\xa2\x88', "\u8888")
3522 _check_decode(b'\n', "\n")
3523 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3524 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003525
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003526 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003527 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003528 if encoding is not None:
3529 encoder = codecs.getincrementalencoder(encoding)()
3530 def _decode_bytewise(s):
3531 # Decode one byte at a time
3532 for b in encoder.encode(s):
3533 result.append(decoder.decode(bytes([b])))
3534 else:
3535 encoder = None
3536 def _decode_bytewise(s):
3537 # Decode one char at a time
3538 for c in s:
3539 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003540 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003541 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003542 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003543 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003544 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003545 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003546 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003547 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003548 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003549 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003550 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003551 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003552 input = "abc"
3553 if encoder is not None:
3554 encoder.reset()
3555 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003556 self.assertEqual(decoder.decode(input), "abc")
3557 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003558
3559 def test_newline_decoder(self):
3560 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003561 # None meaning the IncrementalNewlineDecoder takes unicode input
3562 # rather than bytes input
3563 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003564 'utf-16', 'utf-16-le', 'utf-16-be',
3565 'utf-32', 'utf-32-le', 'utf-32-be',
3566 )
3567 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003568 decoder = enc and codecs.getincrementaldecoder(enc)()
3569 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3570 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003571 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003572 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3573 self.check_newline_decoding_utf8(decoder)
Oren Milman1d1d3e92017-08-20 18:35:36 +03003574 self.assertRaises(TypeError, decoder.setstate, 42)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003575
Antoine Pitrou66913e22009-03-06 23:40:56 +00003576 def test_newline_bytes(self):
3577 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3578 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003579 self.assertEqual(dec.newlines, None)
3580 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3581 self.assertEqual(dec.newlines, None)
3582 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3583 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003584 dec = self.IncrementalNewlineDecoder(None, translate=False)
3585 _check(dec)
3586 dec = self.IncrementalNewlineDecoder(None, translate=True)
3587 _check(dec)
3588
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003589class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3590 pass
3591
3592class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3593 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003594
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003595
Guido van Rossum01a27522007-03-07 01:00:12 +00003596# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003597
Guido van Rossum5abbf752007-08-27 17:39:33 +00003598class MiscIOTest(unittest.TestCase):
3599
Barry Warsaw40e82462008-11-20 20:14:50 +00003600 def tearDown(self):
3601 support.unlink(support.TESTFN)
3602
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003603 def test___all__(self):
3604 for name in self.io.__all__:
3605 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003606 self.assertIsNotNone(obj, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003607 if name == "open":
3608 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003609 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003610 self.assertTrue(issubclass(obj, Exception), name)
3611 elif not name.startswith("SEEK_"):
3612 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003613
Barry Warsaw40e82462008-11-20 20:14:50 +00003614 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003615 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003616 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003617 f.close()
3618
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003619 with support.check_warnings(('', DeprecationWarning)):
3620 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003621 self.assertEqual(f.name, support.TESTFN)
3622 self.assertEqual(f.buffer.name, support.TESTFN)
3623 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3624 self.assertEqual(f.mode, "U")
3625 self.assertEqual(f.buffer.mode, "rb")
3626 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003627 f.close()
3628
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003629 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003630 self.assertEqual(f.mode, "w+")
3631 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3632 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003633
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003634 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003635 self.assertEqual(g.mode, "wb")
3636 self.assertEqual(g.raw.mode, "wb")
3637 self.assertEqual(g.name, f.fileno())
3638 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003639 f.close()
3640 g.close()
3641
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003642 def test_io_after_close(self):
3643 for kwargs in [
3644 {"mode": "w"},
3645 {"mode": "wb"},
3646 {"mode": "w", "buffering": 1},
3647 {"mode": "w", "buffering": 2},
3648 {"mode": "wb", "buffering": 0},
3649 {"mode": "r"},
3650 {"mode": "rb"},
3651 {"mode": "r", "buffering": 1},
3652 {"mode": "r", "buffering": 2},
3653 {"mode": "rb", "buffering": 0},
3654 {"mode": "w+"},
3655 {"mode": "w+b"},
3656 {"mode": "w+", "buffering": 1},
3657 {"mode": "w+", "buffering": 2},
3658 {"mode": "w+b", "buffering": 0},
3659 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003660 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003661 f.close()
3662 self.assertRaises(ValueError, f.flush)
3663 self.assertRaises(ValueError, f.fileno)
3664 self.assertRaises(ValueError, f.isatty)
3665 self.assertRaises(ValueError, f.__iter__)
3666 if hasattr(f, "peek"):
3667 self.assertRaises(ValueError, f.peek, 1)
3668 self.assertRaises(ValueError, f.read)
3669 if hasattr(f, "read1"):
3670 self.assertRaises(ValueError, f.read1, 1024)
Martin Panterccb2c0e2016-10-20 23:48:14 +00003671 self.assertRaises(ValueError, f.read1)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003672 if hasattr(f, "readall"):
3673 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003674 if hasattr(f, "readinto"):
3675 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003676 if hasattr(f, "readinto1"):
3677 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003678 self.assertRaises(ValueError, f.readline)
3679 self.assertRaises(ValueError, f.readlines)
Xiang Zhang026435c2017-04-15 12:47:28 +08003680 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003681 self.assertRaises(ValueError, f.seek, 0)
3682 self.assertRaises(ValueError, f.tell)
3683 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003684 self.assertRaises(ValueError, f.write,
3685 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003686 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003687 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003688
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003689 def test_blockingioerror(self):
3690 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003691 class C(str):
3692 pass
3693 c = C("")
3694 b = self.BlockingIOError(1, c)
3695 c.b = b
3696 b.c = c
3697 wr = weakref.ref(c)
3698 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003699 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003700 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003701
3702 def test_abcs(self):
3703 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003704 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3705 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3706 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3707 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003708
3709 def _check_abc_inheritance(self, abcmodule):
3710 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003711 self.assertIsInstance(f, abcmodule.IOBase)
3712 self.assertIsInstance(f, abcmodule.RawIOBase)
3713 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3714 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003715 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003716 self.assertIsInstance(f, abcmodule.IOBase)
3717 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3718 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3719 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003720 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003721 self.assertIsInstance(f, abcmodule.IOBase)
3722 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3723 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3724 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003725
3726 def test_abc_inheritance(self):
3727 # Test implementations inherit from their respective ABCs
3728 self._check_abc_inheritance(self)
3729
3730 def test_abc_inheritance_official(self):
3731 # Test implementations inherit from the official ABCs of the
3732 # baseline "io" module.
3733 self._check_abc_inheritance(io)
3734
Antoine Pitroue033e062010-10-29 10:38:18 +00003735 def _check_warn_on_dealloc(self, *args, **kwargs):
3736 f = open(*args, **kwargs)
3737 r = repr(f)
3738 with self.assertWarns(ResourceWarning) as cm:
3739 f = None
3740 support.gc_collect()
3741 self.assertIn(r, str(cm.warning.args[0]))
3742
3743 def test_warn_on_dealloc(self):
3744 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3745 self._check_warn_on_dealloc(support.TESTFN, "wb")
3746 self._check_warn_on_dealloc(support.TESTFN, "w")
3747
3748 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3749 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003750 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003751 for fd in fds:
3752 try:
3753 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003754 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003755 if e.errno != errno.EBADF:
3756 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003757 self.addCleanup(cleanup_fds)
3758 r, w = os.pipe()
3759 fds += r, w
3760 self._check_warn_on_dealloc(r, *args, **kwargs)
3761 # When using closefd=False, there's no warning
3762 r, w = os.pipe()
3763 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02003764 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00003765 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00003766
3767 def test_warn_on_dealloc_fd(self):
3768 self._check_warn_on_dealloc_fd("rb", buffering=0)
3769 self._check_warn_on_dealloc_fd("rb")
3770 self._check_warn_on_dealloc_fd("r")
3771
3772
Antoine Pitrou243757e2010-11-05 21:15:39 +00003773 def test_pickling(self):
3774 # Pickling file objects is forbidden
3775 for kwargs in [
3776 {"mode": "w"},
3777 {"mode": "wb"},
3778 {"mode": "wb", "buffering": 0},
3779 {"mode": "r"},
3780 {"mode": "rb"},
3781 {"mode": "rb", "buffering": 0},
3782 {"mode": "w+"},
3783 {"mode": "w+b"},
3784 {"mode": "w+b", "buffering": 0},
3785 ]:
3786 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3787 with self.open(support.TESTFN, **kwargs) as f:
3788 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3789
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003790 def test_nonblock_pipe_write_bigbuf(self):
3791 self._test_nonblock_pipe_write(16*1024)
3792
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003793 def test_nonblock_pipe_write_smallbuf(self):
3794 self._test_nonblock_pipe_write(1024)
3795
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003796 @unittest.skipUnless(hasattr(os, 'set_blocking'),
3797 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003798 def _test_nonblock_pipe_write(self, bufsize):
3799 sent = []
3800 received = []
3801 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003802 os.set_blocking(r, False)
3803 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003804
3805 # To exercise all code paths in the C implementation we need
3806 # to play with buffer sizes. For instance, if we choose a
3807 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3808 # then we will never get a partial write of the buffer.
3809 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3810 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3811
3812 with rf, wf:
3813 for N in 9999, 73, 7574:
3814 try:
3815 i = 0
3816 while True:
3817 msg = bytes([i % 26 + 97]) * N
3818 sent.append(msg)
3819 wf.write(msg)
3820 i += 1
3821
3822 except self.BlockingIOError as e:
3823 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003824 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003825 sent[-1] = sent[-1][:e.characters_written]
3826 received.append(rf.read())
3827 msg = b'BLOCKED'
3828 wf.write(msg)
3829 sent.append(msg)
3830
3831 while True:
3832 try:
3833 wf.flush()
3834 break
3835 except self.BlockingIOError as e:
3836 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003837 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003838 self.assertEqual(e.characters_written, 0)
3839 received.append(rf.read())
3840
3841 received += iter(rf.read, None)
3842
3843 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003844 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003845 self.assertTrue(wf.closed)
3846 self.assertTrue(rf.closed)
3847
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003848 def test_create_fail(self):
3849 # 'x' mode fails if file is existing
3850 with self.open(support.TESTFN, 'w'):
3851 pass
3852 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3853
3854 def test_create_writes(self):
3855 # 'x' mode opens for writing
3856 with self.open(support.TESTFN, 'xb') as f:
3857 f.write(b"spam")
3858 with self.open(support.TESTFN, 'rb') as f:
3859 self.assertEqual(b"spam", f.read())
3860
Christian Heimes7b648752012-09-10 14:48:43 +02003861 def test_open_allargs(self):
3862 # there used to be a buffer overflow in the parser for rawmode
3863 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3864
3865
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003866class CMiscIOTest(MiscIOTest):
3867 io = io
3868
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003869 def test_readinto_buffer_overflow(self):
3870 # Issue #18025
3871 class BadReader(self.io.BufferedIOBase):
3872 def read(self, n=-1):
3873 return b'x' * 10**6
3874 bufio = BadReader()
3875 b = bytearray(2)
3876 self.assertRaises(ValueError, bufio.readinto, b)
3877
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003878 def check_daemon_threads_shutdown_deadlock(self, stream_name):
3879 # Issue #23309: deadlocks at shutdown should be avoided when a
3880 # daemon thread and the main thread both write to a file.
3881 code = """if 1:
3882 import sys
3883 import time
3884 import threading
Victor Stinner2a1aed02017-04-21 17:59:23 +02003885 from test.support import SuppressCrashReport
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003886
3887 file = sys.{stream_name}
3888
3889 def run():
3890 while True:
3891 file.write('.')
3892 file.flush()
3893
Victor Stinner2a1aed02017-04-21 17:59:23 +02003894 crash = SuppressCrashReport()
3895 crash.__enter__()
3896 # don't call __exit__(): the crash occurs at Python shutdown
3897
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003898 thread = threading.Thread(target=run)
3899 thread.daemon = True
3900 thread.start()
3901
3902 time.sleep(0.5)
3903 file.write('!')
3904 file.flush()
3905 """.format_map(locals())
3906 res, _ = run_python_until_end("-c", code)
3907 err = res.err.decode()
3908 if res.rc != 0:
3909 # Failure: should be a fatal error
3910 self.assertIn("Fatal Python error: could not acquire lock "
3911 "for <_io.BufferedWriter name='<{stream_name}>'> "
3912 "at interpreter shutdown, possibly due to "
3913 "daemon threads".format_map(locals()),
3914 err)
3915 else:
3916 self.assertFalse(err.strip('.!'))
3917
3918 def test_daemon_threads_shutdown_stdout_deadlock(self):
3919 self.check_daemon_threads_shutdown_deadlock('stdout')
3920
3921 def test_daemon_threads_shutdown_stderr_deadlock(self):
3922 self.check_daemon_threads_shutdown_deadlock('stderr')
3923
3924
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003925class PyMiscIOTest(MiscIOTest):
3926 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003927
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003928
3929@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3930class SignalsTest(unittest.TestCase):
3931
3932 def setUp(self):
3933 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3934
3935 def tearDown(self):
3936 signal.signal(signal.SIGALRM, self.oldalrm)
3937
3938 def alarm_interrupt(self, sig, frame):
3939 1/0
3940
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003941 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3942 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003943 invokes the signal handler, and bubbles up the exception raised
3944 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003945 read_results = []
3946 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003947 if hasattr(signal, 'pthread_sigmask'):
3948 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003949 s = os.read(r, 1)
3950 read_results.append(s)
3951 t = threading.Thread(target=_read)
3952 t.daemon = True
3953 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003954 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01003955 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003956 try:
3957 wio = self.io.open(w, **fdopen_kwargs)
3958 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003959 # Fill the pipe enough that the write will be blocking.
3960 # It will be interrupted by the timer armed above. Since the
3961 # other thread has read one byte, the low-level write will
3962 # return with a successful (partial) result rather than an EINTR.
3963 # The buffered IO layer must check for pending signal
3964 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003965 signal.alarm(1)
3966 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01003967 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02003968 finally:
3969 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003970 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003971 # We got one byte, get another one and check that it isn't a
3972 # repeat of the first one.
3973 read_results.append(os.read(r, 1))
3974 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3975 finally:
3976 os.close(w)
3977 os.close(r)
3978 # This is deliberate. If we didn't close the file descriptor
3979 # before closing wio, wio would try to flush its internal
3980 # buffer, and block again.
3981 try:
3982 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003983 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003984 if e.errno != errno.EBADF:
3985 raise
3986
3987 def test_interrupted_write_unbuffered(self):
3988 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3989
3990 def test_interrupted_write_buffered(self):
3991 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3992
Victor Stinner6ab72862014-09-03 23:32:28 +02003993 # Issue #22331: The test hangs on FreeBSD 7.2
3994 @support.requires_freebsd_version(8)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003995 def test_interrupted_write_text(self):
3996 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3997
Brett Cannon31f59292011-02-21 19:29:56 +00003998 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003999 def check_reentrant_write(self, data, **fdopen_kwargs):
4000 def on_alarm(*args):
4001 # Will be called reentrantly from the same thread
4002 wio.write(data)
4003 1/0
4004 signal.signal(signal.SIGALRM, on_alarm)
4005 r, w = os.pipe()
4006 wio = self.io.open(w, **fdopen_kwargs)
4007 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004008 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004009 # Either the reentrant call to wio.write() fails with RuntimeError,
4010 # or the signal handler raises ZeroDivisionError.
4011 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4012 while 1:
4013 for i in range(100):
4014 wio.write(data)
4015 wio.flush()
4016 # Make sure the buffer doesn't fill up and block further writes
4017 os.read(r, len(data) * 100)
4018 exc = cm.exception
4019 if isinstance(exc, RuntimeError):
4020 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4021 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004022 signal.alarm(0)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004023 wio.close()
4024 os.close(r)
4025
4026 def test_reentrant_write_buffered(self):
4027 self.check_reentrant_write(b"xy", mode="wb")
4028
4029 def test_reentrant_write_text(self):
4030 self.check_reentrant_write("xy", mode="w", encoding="ascii")
4031
Antoine Pitrou707ce822011-02-25 21:24:11 +00004032 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4033 """Check that a buffered read, when it gets interrupted (either
4034 returning a partial result or EINTR), properly invokes the signal
4035 handler and retries if the latter returned successfully."""
4036 r, w = os.pipe()
4037 fdopen_kwargs["closefd"] = False
4038 def alarm_handler(sig, frame):
4039 os.write(w, b"bar")
4040 signal.signal(signal.SIGALRM, alarm_handler)
4041 try:
4042 rio = self.io.open(r, **fdopen_kwargs)
4043 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004044 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004045 # Expected behaviour:
4046 # - first raw read() returns partial b"foo"
4047 # - second raw read() returns EINTR
4048 # - third raw read() returns b"bar"
4049 self.assertEqual(decode(rio.read(6)), "foobar")
4050 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004051 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004052 rio.close()
4053 os.close(w)
4054 os.close(r)
4055
Antoine Pitrou20db5112011-08-19 20:32:34 +02004056 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004057 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4058 mode="rb")
4059
Antoine Pitrou20db5112011-08-19 20:32:34 +02004060 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004061 self.check_interrupted_read_retry(lambda x: x,
4062 mode="r")
4063
Antoine Pitrou707ce822011-02-25 21:24:11 +00004064 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4065 """Check that a buffered write, when it gets interrupted (either
4066 returning a partial result or EINTR), properly invokes the signal
4067 handler and retries if the latter returned successfully."""
4068 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004069
Antoine Pitrou707ce822011-02-25 21:24:11 +00004070 # A quantity that exceeds the buffer size of an anonymous pipe's
4071 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02004072 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00004073 r, w = os.pipe()
4074 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004075
Antoine Pitrou707ce822011-02-25 21:24:11 +00004076 # We need a separate thread to read from the pipe and allow the
4077 # write() to finish. This thread is started after the SIGALRM is
4078 # received (forcing a first EINTR in write()).
4079 read_results = []
4080 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004081 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00004082 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004083 try:
4084 while not write_finished:
4085 while r in select.select([r], [], [], 1.0)[0]:
4086 s = os.read(r, 1024)
4087 read_results.append(s)
4088 except BaseException as exc:
4089 nonlocal error
4090 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00004091 t = threading.Thread(target=_read)
4092 t.daemon = True
4093 def alarm1(sig, frame):
4094 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004095 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004096 def alarm2(sig, frame):
4097 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004098
4099 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00004100 signal.signal(signal.SIGALRM, alarm1)
4101 try:
4102 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004103 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004104 # Expected behaviour:
4105 # - first raw write() is partial (because of the limited pipe buffer
4106 # and the first alarm)
4107 # - second raw write() returns EINTR (because of the second alarm)
4108 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004109 written = wio.write(large_data)
4110 self.assertEqual(N, written)
4111
Antoine Pitrou707ce822011-02-25 21:24:11 +00004112 wio.flush()
4113 write_finished = True
4114 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004115
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004116 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004117 self.assertEqual(N, sum(len(x) for x in read_results))
4118 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004119 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004120 write_finished = True
4121 os.close(w)
4122 os.close(r)
4123 # This is deliberate. If we didn't close the file descriptor
4124 # before closing wio, wio would try to flush its internal
4125 # buffer, and could block (in case of failure).
4126 try:
4127 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004128 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00004129 if e.errno != errno.EBADF:
4130 raise
4131
Antoine Pitrou20db5112011-08-19 20:32:34 +02004132 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004133 self.check_interrupted_write_retry(b"x", mode="wb")
4134
Antoine Pitrou20db5112011-08-19 20:32:34 +02004135 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004136 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4137
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004138
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004139class CSignalsTest(SignalsTest):
4140 io = io
4141
4142class PySignalsTest(SignalsTest):
4143 io = pyio
4144
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004145 # Handling reentrancy issues would slow down _pyio even more, so the
4146 # tests are disabled.
4147 test_reentrant_write_buffered = None
4148 test_reentrant_write_text = None
4149
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004150
Ezio Melottidaa42c72013-03-23 16:30:16 +02004151def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07004152 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004153 CBufferedReaderTest, PyBufferedReaderTest,
4154 CBufferedWriterTest, PyBufferedWriterTest,
4155 CBufferedRWPairTest, PyBufferedRWPairTest,
4156 CBufferedRandomTest, PyBufferedRandomTest,
4157 StatefulIncrementalDecoderTest,
4158 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4159 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004160 CMiscIOTest, PyMiscIOTest,
4161 CSignalsTest, PySignalsTest,
4162 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004163
4164 # Put the namespaces of the IO module we are testing and some useful mock
4165 # classes in the __dict__ of each test.
4166 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
benfogle9703f092017-11-10 16:03:40 -05004167 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4168 SlowFlushRawIO)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004169 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4170 c_io_ns = {name : getattr(io, name) for name in all_members}
4171 py_io_ns = {name : getattr(pyio, name) for name in all_members}
4172 globs = globals()
4173 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4174 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4175 # Avoid turning open into a bound method.
4176 py_io_ns["open"] = pyio.OpenWrapper
4177 for test in tests:
4178 if test.__name__.startswith("C"):
4179 for name, obj in c_io_ns.items():
4180 setattr(test, name, obj)
4181 elif test.__name__.startswith("Py"):
4182 for name, obj in py_io_ns.items():
4183 setattr(test, name, obj)
4184
Ezio Melottidaa42c72013-03-23 16:30:16 +02004185 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4186 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004187
4188if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004189 unittest.main()