blob: 4be5631ab4464217d0fe8fa61f05af73a2d2c381 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020031import threading
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000032import time
Guido van Rossum28524c72007-02-27 05:47:44 +000033import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000034import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020035import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020036from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020037from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000038from test import support
Berker Peksagce643912015-05-06 06:33:17 +030039from test.support.script_helper import assert_python_ok, run_python_until_end
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000040
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000041import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000042import io # C implementation of io
43import _pyio as pyio # Python implementation of io
Guido van Rossuma9e20242007-03-08 00:43:48 +000044
Martin Panter6bb91f32016-05-28 00:41:57 +000045try:
46 import ctypes
47except ImportError:
48 def byteslike(*pos, **kw):
49 return array.array("b", bytes(*pos, **kw))
50else:
51 def byteslike(*pos, **kw):
52 """Create a bytes-like object having no string or sequence methods"""
53 data = bytes(*pos, **kw)
54 obj = EmptyStruct()
55 ctypes.resize(obj, len(data))
56 memoryview(obj).cast("B")[:] = data
57 return obj
58 class EmptyStruct(ctypes.Structure):
59 pass
60
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000061def _default_chunk_size():
62 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000063 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064 return f._CHUNK_SIZE
65
66
Antoine Pitrou328ec742010-09-14 18:37:24 +000067class MockRawIOWithoutRead:
68 """A RawIO implementation without read(), so as to exercise the default
69 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000070
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000071 def __init__(self, read_stack=()):
72 self._read_stack = list(read_stack)
73 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000074 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000075 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000076
Guido van Rossum01a27522007-03-07 01:00:12 +000077 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000078 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000079 return len(b)
80
81 def writable(self):
82 return True
83
Guido van Rossum68bbcd22007-02-27 17:19:33 +000084 def fileno(self):
85 return 42
86
87 def readable(self):
88 return True
89
Guido van Rossum01a27522007-03-07 01:00:12 +000090 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000091 return True
92
Guido van Rossum01a27522007-03-07 01:00:12 +000093 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000094 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000095
96 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0 # same comment as above
98
99 def readinto(self, buf):
100 self._reads += 1
101 max_len = len(buf)
102 try:
103 data = self._read_stack[0]
104 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000105 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000106 return 0
107 if data is None:
108 del self._read_stack[0]
109 return None
110 n = len(data)
111 if len(data) <= max_len:
112 del self._read_stack[0]
113 buf[:n] = data
114 return n
115 else:
116 buf[:] = data[:max_len]
117 self._read_stack[0] = data[max_len:]
118 return max_len
119
120 def truncate(self, pos=None):
121 return pos
122
Antoine Pitrou328ec742010-09-14 18:37:24 +0000123class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
124 pass
125
126class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
127 pass
128
129
130class MockRawIO(MockRawIOWithoutRead):
131
132 def read(self, n=None):
133 self._reads += 1
134 try:
135 return self._read_stack.pop(0)
136 except:
137 self._extraneous_reads += 1
138 return b""
139
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000140class CMockRawIO(MockRawIO, io.RawIOBase):
141 pass
142
143class PyMockRawIO(MockRawIO, pyio.RawIOBase):
144 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000145
Guido van Rossuma9e20242007-03-08 00:43:48 +0000146
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000147class MisbehavedRawIO(MockRawIO):
148 def write(self, b):
149 return super().write(b) * 2
150
151 def read(self, n=None):
152 return super().read(n) * 2
153
154 def seek(self, pos, whence):
155 return -123
156
157 def tell(self):
158 return -456
159
160 def readinto(self, buf):
161 super().readinto(buf)
162 return len(buf) * 5
163
164class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
165 pass
166
167class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
168 pass
169
170
benfogle9703f092017-11-10 16:03:40 -0500171class SlowFlushRawIO(MockRawIO):
172 def __init__(self):
173 super().__init__()
174 self.in_flush = threading.Event()
175
176 def flush(self):
177 self.in_flush.set()
178 time.sleep(0.25)
179
180class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
181 pass
182
183class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
184 pass
185
186
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000187class CloseFailureIO(MockRawIO):
188 closed = 0
189
190 def close(self):
191 if not self.closed:
192 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200193 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000194
195class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
196 pass
197
198class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
199 pass
200
201
202class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000203
204 def __init__(self, data):
205 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000206 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000207
208 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000209 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000210 self.read_history.append(None if res is None else len(res))
211 return res
212
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000213 def readinto(self, b):
214 res = super().readinto(b)
215 self.read_history.append(res)
216 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000217
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000218class CMockFileIO(MockFileIO, io.BytesIO):
219 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000220
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000221class PyMockFileIO(MockFileIO, pyio.BytesIO):
222 pass
223
224
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000225class MockUnseekableIO:
226 def seekable(self):
227 return False
228
229 def seek(self, *args):
230 raise self.UnsupportedOperation("not seekable")
231
232 def tell(self, *args):
233 raise self.UnsupportedOperation("not seekable")
234
Martin Panter754aab22016-03-31 07:21:56 +0000235 def truncate(self, *args):
236 raise self.UnsupportedOperation("not seekable")
237
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000238class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
239 UnsupportedOperation = io.UnsupportedOperation
240
241class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
242 UnsupportedOperation = pyio.UnsupportedOperation
243
244
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000245class MockNonBlockWriterIO:
246
247 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000248 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000249 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000250
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000251 def pop_written(self):
252 s = b"".join(self._write_stack)
253 self._write_stack[:] = []
254 return s
255
256 def block_on(self, char):
257 """Block when a given char is encountered."""
258 self._blocker_char = char
259
260 def readable(self):
261 return True
262
263 def seekable(self):
264 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000265
Guido van Rossum01a27522007-03-07 01:00:12 +0000266 def writable(self):
267 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000268
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000269 def write(self, b):
270 b = bytes(b)
271 n = -1
272 if self._blocker_char:
273 try:
274 n = b.index(self._blocker_char)
275 except ValueError:
276 pass
277 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100278 if n > 0:
279 # write data up to the first blocker
280 self._write_stack.append(b[:n])
281 return n
282 else:
283 # cancel blocker and indicate would block
284 self._blocker_char = None
285 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000286 self._write_stack.append(b)
287 return len(b)
288
289class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
290 BlockingIOError = io.BlockingIOError
291
292class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
293 BlockingIOError = pyio.BlockingIOError
294
Guido van Rossuma9e20242007-03-08 00:43:48 +0000295
Guido van Rossum28524c72007-02-27 05:47:44 +0000296class IOTest(unittest.TestCase):
297
Neal Norwitze7789b12008-03-24 06:18:09 +0000298 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000299 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000300
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000301 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000302 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000303
Guido van Rossum28524c72007-02-27 05:47:44 +0000304 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000305 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000306 f.truncate(0)
307 self.assertEqual(f.tell(), 5)
308 f.seek(0)
309
310 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000311 self.assertEqual(f.seek(0), 0)
312 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000313 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000314 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000315 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000316 buffer = bytearray(b" world\n\n\n")
317 self.assertEqual(f.write(buffer), 9)
318 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000319 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000320 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000321 self.assertEqual(f.seek(-1, 2), 13)
322 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000323
Guido van Rossum87429772007-04-10 21:06:59 +0000324 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000325 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000326 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000327
Guido van Rossum9b76da62007-04-11 01:09:03 +0000328 def read_ops(self, f, buffered=False):
329 data = f.read(5)
330 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000331 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000332 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000333 self.assertEqual(bytes(data), b" worl")
334 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000335 self.assertEqual(f.readinto(data), 2)
336 self.assertEqual(len(data), 5)
337 self.assertEqual(data[:2], b"d\n")
338 self.assertEqual(f.seek(0), 0)
339 self.assertEqual(f.read(20), b"hello world\n")
340 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000341 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000342 self.assertEqual(f.seek(-6, 2), 6)
343 self.assertEqual(f.read(5), b"world")
344 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000345 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000346 self.assertEqual(f.seek(-6, 1), 5)
347 self.assertEqual(f.read(5), b" worl")
348 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000349 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000350 if buffered:
351 f.seek(0)
352 self.assertEqual(f.read(), b"hello world\n")
353 f.seek(6)
354 self.assertEqual(f.read(), b"world\n")
355 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000356 f.seek(0)
357 data = byteslike(5)
358 self.assertEqual(f.readinto1(data), 5)
359 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000360
Guido van Rossum34d69e52007-04-10 20:08:41 +0000361 LARGE = 2**31
362
Guido van Rossum53807da2007-04-10 19:01:47 +0000363 def large_file_ops(self, f):
364 assert f.readable()
365 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100366 try:
367 self.assertEqual(f.seek(self.LARGE), self.LARGE)
368 except (OverflowError, ValueError):
369 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000370 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000371 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000372 self.assertEqual(f.tell(), self.LARGE + 3)
373 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000374 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000375 self.assertEqual(f.tell(), self.LARGE + 2)
376 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000377 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000378 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000379 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
380 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000381 self.assertEqual(f.read(2), b"x")
382
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000383 def test_invalid_operations(self):
384 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000385 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000386 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000387 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000388 self.assertRaises(exc, fp.read)
389 self.assertRaises(exc, fp.readline)
390 with self.open(support.TESTFN, "wb", buffering=0) as fp:
391 self.assertRaises(exc, fp.read)
392 self.assertRaises(exc, fp.readline)
393 with self.open(support.TESTFN, "rb", buffering=0) as fp:
394 self.assertRaises(exc, fp.write, b"blah")
395 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000396 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000397 self.assertRaises(exc, fp.write, b"blah")
398 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000399 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000400 self.assertRaises(exc, fp.write, "blah")
401 self.assertRaises(exc, fp.writelines, ["blah\n"])
402 # Non-zero seeking from current or end pos
403 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
404 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000405
Martin Panter754aab22016-03-31 07:21:56 +0000406 def test_optional_abilities(self):
407 # Test for OSError when optional APIs are not supported
408 # The purpose of this test is to try fileno(), reading, writing and
409 # seeking operations with various objects that indicate they do not
410 # support these operations.
411
412 def pipe_reader():
413 [r, w] = os.pipe()
414 os.close(w) # So that read() is harmless
415 return self.FileIO(r, "r")
416
417 def pipe_writer():
418 [r, w] = os.pipe()
419 self.addCleanup(os.close, r)
420 # Guarantee that we can write into the pipe without blocking
421 thread = threading.Thread(target=os.read, args=(r, 100))
422 thread.start()
423 self.addCleanup(thread.join)
424 return self.FileIO(w, "w")
425
426 def buffered_reader():
427 return self.BufferedReader(self.MockUnseekableIO())
428
429 def buffered_writer():
430 return self.BufferedWriter(self.MockUnseekableIO())
431
432 def buffered_random():
433 return self.BufferedRandom(self.BytesIO())
434
435 def buffered_rw_pair():
436 return self.BufferedRWPair(self.MockUnseekableIO(),
437 self.MockUnseekableIO())
438
439 def text_reader():
440 class UnseekableReader(self.MockUnseekableIO):
441 writable = self.BufferedIOBase.writable
442 write = self.BufferedIOBase.write
443 return self.TextIOWrapper(UnseekableReader(), "ascii")
444
445 def text_writer():
446 class UnseekableWriter(self.MockUnseekableIO):
447 readable = self.BufferedIOBase.readable
448 read = self.BufferedIOBase.read
449 return self.TextIOWrapper(UnseekableWriter(), "ascii")
450
451 tests = (
452 (pipe_reader, "fr"), (pipe_writer, "fw"),
453 (buffered_reader, "r"), (buffered_writer, "w"),
454 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
455 (text_reader, "r"), (text_writer, "w"),
456 (self.BytesIO, "rws"), (self.StringIO, "rws"),
457 )
458 for [test, abilities] in tests:
Martin Panter754aab22016-03-31 07:21:56 +0000459 with self.subTest(test), test() as obj:
460 readable = "r" in abilities
461 self.assertEqual(obj.readable(), readable)
462 writable = "w" in abilities
463 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000464
465 if isinstance(obj, self.TextIOBase):
466 data = "3"
467 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
468 data = b"3"
469 else:
470 self.fail("Unknown base class")
471
472 if "f" in abilities:
473 obj.fileno()
474 else:
475 self.assertRaises(OSError, obj.fileno)
476
477 if readable:
478 obj.read(1)
479 obj.read()
480 else:
481 self.assertRaises(OSError, obj.read, 1)
482 self.assertRaises(OSError, obj.read)
483
484 if writable:
485 obj.write(data)
486 else:
487 self.assertRaises(OSError, obj.write, data)
488
Martin Panter3ee147f2016-03-31 21:05:31 +0000489 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000490 pipe_reader, pipe_writer):
491 # Pipes seem to appear as seekable on Windows
492 continue
493 seekable = "s" in abilities
494 self.assertEqual(obj.seekable(), seekable)
495
Martin Panter754aab22016-03-31 07:21:56 +0000496 if seekable:
497 obj.tell()
498 obj.seek(0)
499 else:
500 self.assertRaises(OSError, obj.tell)
501 self.assertRaises(OSError, obj.seek, 0)
502
503 if writable and seekable:
504 obj.truncate()
505 obj.truncate(0)
506 else:
507 self.assertRaises(OSError, obj.truncate)
508 self.assertRaises(OSError, obj.truncate, 0)
509
Antoine Pitrou13348842012-01-29 18:36:34 +0100510 def test_open_handles_NUL_chars(self):
511 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300512 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100513
514 bytes_fn = bytes(fn_with_NUL, 'ascii')
515 with warnings.catch_warnings():
516 warnings.simplefilter("ignore", DeprecationWarning)
517 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100518
Guido van Rossum28524c72007-02-27 05:47:44 +0000519 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000520 with self.open(support.TESTFN, "wb", buffering=0) as f:
521 self.assertEqual(f.readable(), False)
522 self.assertEqual(f.writable(), True)
523 self.assertEqual(f.seekable(), True)
524 self.write_ops(f)
525 with self.open(support.TESTFN, "rb", buffering=0) as f:
526 self.assertEqual(f.readable(), True)
527 self.assertEqual(f.writable(), False)
528 self.assertEqual(f.seekable(), True)
529 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000530
Guido van Rossum87429772007-04-10 21:06:59 +0000531 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000532 with self.open(support.TESTFN, "wb") as f:
533 self.assertEqual(f.readable(), False)
534 self.assertEqual(f.writable(), True)
535 self.assertEqual(f.seekable(), True)
536 self.write_ops(f)
537 with self.open(support.TESTFN, "rb") as f:
538 self.assertEqual(f.readable(), True)
539 self.assertEqual(f.writable(), False)
540 self.assertEqual(f.seekable(), True)
541 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000542
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000543 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000544 with self.open(support.TESTFN, "wb") as f:
545 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
546 with self.open(support.TESTFN, "rb") as f:
547 self.assertEqual(f.readline(), b"abc\n")
548 self.assertEqual(f.readline(10), b"def\n")
549 self.assertEqual(f.readline(2), b"xy")
550 self.assertEqual(f.readline(4), b"zzy\n")
551 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000552 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000553 self.assertRaises(TypeError, f.readline, 5.3)
554 with self.open(support.TESTFN, "r") as f:
555 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000556
Serhiy Storchakabf623ae2017-04-19 20:03:52 +0300557 def test_readline_nonsizeable(self):
558 # Issue #30061
559 # Crash when readline() returns an object without __len__
560 class R(self.IOBase):
561 def readline(self):
562 return None
563 self.assertRaises((TypeError, StopIteration), next, R())
564
565 def test_next_nonsizeable(self):
566 # Issue #30061
567 # Crash when __next__() returns an object without __len__
568 class R(self.IOBase):
569 def __next__(self):
570 return None
571 self.assertRaises(TypeError, R().readlines, 1)
572
Guido van Rossum28524c72007-02-27 05:47:44 +0000573 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000574 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000575 self.write_ops(f)
576 data = f.getvalue()
577 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000578 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000579 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000580
Guido van Rossum53807da2007-04-10 19:01:47 +0000581 def test_large_file_ops(self):
Mike53f7a7c2017-12-14 14:04:53 +0300582 # On Windows and Mac OSX this test consumes large resources; It takes
Victor Stinner8c663fd2017-11-08 14:44:44 -0800583 # a long time to build the >2 GiB file and takes >2 GiB of disk space
Guido van Rossum34d69e52007-04-10 20:08:41 +0000584 # therefore the resource must be enabled to run this test.
585 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600586 support.requires(
587 'largefile',
588 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000589 with self.open(support.TESTFN, "w+b", 0) as f:
590 self.large_file_ops(f)
591 with self.open(support.TESTFN, "w+b") as f:
592 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000593
594 def test_with_open(self):
595 for bufsize in (0, 1, 100):
596 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000597 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000598 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000599 self.assertEqual(f.closed, True)
600 f = None
601 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000602 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000603 1/0
604 except ZeroDivisionError:
605 self.assertEqual(f.closed, True)
606 else:
607 self.fail("1/0 didn't raise an exception")
608
Antoine Pitrou08838b62009-01-21 00:55:13 +0000609 # issue 5008
610 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000611 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000612 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000613 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000614 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000615 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000616 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000617 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300618 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000619
Guido van Rossum87429772007-04-10 21:06:59 +0000620 def test_destructor(self):
621 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000622 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000623 def __del__(self):
624 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000625 try:
626 f = super().__del__
627 except AttributeError:
628 pass
629 else:
630 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000631 def close(self):
632 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000633 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000634 def flush(self):
635 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000636 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000637 with support.check_warnings(('', ResourceWarning)):
638 f = MyFileIO(support.TESTFN, "wb")
639 f.write(b"xxx")
640 del f
641 support.gc_collect()
642 self.assertEqual(record, [1, 2, 3])
643 with self.open(support.TESTFN, "rb") as f:
644 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000645
646 def _check_base_destructor(self, base):
647 record = []
648 class MyIO(base):
649 def __init__(self):
650 # This exercises the availability of attributes on object
651 # destruction.
652 # (in the C version, close() is called by the tp_dealloc
653 # function, not by __del__)
654 self.on_del = 1
655 self.on_close = 2
656 self.on_flush = 3
657 def __del__(self):
658 record.append(self.on_del)
659 try:
660 f = super().__del__
661 except AttributeError:
662 pass
663 else:
664 f()
665 def close(self):
666 record.append(self.on_close)
667 super().close()
668 def flush(self):
669 record.append(self.on_flush)
670 super().flush()
671 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000672 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000673 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000674 self.assertEqual(record, [1, 2, 3])
675
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000676 def test_IOBase_destructor(self):
677 self._check_base_destructor(self.IOBase)
678
679 def test_RawIOBase_destructor(self):
680 self._check_base_destructor(self.RawIOBase)
681
682 def test_BufferedIOBase_destructor(self):
683 self._check_base_destructor(self.BufferedIOBase)
684
685 def test_TextIOBase_destructor(self):
686 self._check_base_destructor(self.TextIOBase)
687
Guido van Rossum87429772007-04-10 21:06:59 +0000688 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000689 with self.open(support.TESTFN, "wb") as f:
690 f.write(b"xxx")
691 with self.open(support.TESTFN, "rb") as f:
692 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000693
Guido van Rossumd4103952007-04-12 05:44:49 +0000694 def test_array_writes(self):
695 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000696 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000697 def check(f):
698 with f:
699 self.assertEqual(f.write(a), n)
700 f.writelines((a,))
701 check(self.BytesIO())
702 check(self.FileIO(support.TESTFN, "w"))
703 check(self.BufferedWriter(self.MockRawIO()))
704 check(self.BufferedRandom(self.MockRawIO()))
705 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000706
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000707 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000708 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000709 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000710
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000711 def test_read_closed(self):
712 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000713 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000714 with self.open(support.TESTFN, "r") as f:
715 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000716 self.assertEqual(file.read(), "egg\n")
717 file.seek(0)
718 file.close()
719 self.assertRaises(ValueError, file.read)
720
721 def test_no_closefd_with_filename(self):
722 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000723 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000724
725 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000726 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000727 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000728 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000729 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000730 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000731 self.assertEqual(file.buffer.raw.closefd, False)
732
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000733 def test_garbage_collection(self):
734 # FileIO objects are collected, and collecting them flushes
735 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000736 with support.check_warnings(('', ResourceWarning)):
737 f = self.FileIO(support.TESTFN, "wb")
738 f.write(b"abcxxx")
739 f.f = f
740 wr = weakref.ref(f)
741 del f
742 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300743 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000744 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000745 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000746
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000747 def test_unbounded_file(self):
748 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
749 zero = "/dev/zero"
750 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000751 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000752 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000753 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000754 if support.real_max_memuse < support._2G:
Victor Stinner8c663fd2017-11-08 14:44:44 -0800755 self.skipTest("test requires at least 2 GiB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000756 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000757 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000758 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000759 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000760 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000761 self.assertRaises(OverflowError, f.read)
762
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200763 def check_flush_error_on_close(self, *args, **kwargs):
764 # Test that the file is closed despite failed flush
765 # and that flush() is called before file closed.
766 f = self.open(*args, **kwargs)
767 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000768 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200769 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200770 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000771 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200772 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600773 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200774 self.assertTrue(closed) # flush() called
775 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200776 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200777
778 def test_flush_error_on_close(self):
779 # raw file
780 # Issue #5700: io.FileIO calls flush() after file closed
781 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
782 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
783 self.check_flush_error_on_close(fd, 'wb', buffering=0)
784 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
785 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
786 os.close(fd)
787 # buffered io
788 self.check_flush_error_on_close(support.TESTFN, 'wb')
789 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
790 self.check_flush_error_on_close(fd, 'wb')
791 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
792 self.check_flush_error_on_close(fd, 'wb', closefd=False)
793 os.close(fd)
794 # text io
795 self.check_flush_error_on_close(support.TESTFN, 'w')
796 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
797 self.check_flush_error_on_close(fd, 'w')
798 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
799 self.check_flush_error_on_close(fd, 'w', closefd=False)
800 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000801
802 def test_multi_close(self):
803 f = self.open(support.TESTFN, "wb", buffering=0)
804 f.close()
805 f.close()
806 f.close()
807 self.assertRaises(ValueError, f.flush)
808
Antoine Pitrou328ec742010-09-14 18:37:24 +0000809 def test_RawIOBase_read(self):
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530810 # Exercise the default limited RawIOBase.read(n) implementation (which
811 # calls readinto() internally).
Antoine Pitrou328ec742010-09-14 18:37:24 +0000812 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
813 self.assertEqual(rawio.read(2), b"ab")
814 self.assertEqual(rawio.read(2), b"c")
815 self.assertEqual(rawio.read(2), b"d")
816 self.assertEqual(rawio.read(2), None)
817 self.assertEqual(rawio.read(2), b"ef")
818 self.assertEqual(rawio.read(2), b"g")
819 self.assertEqual(rawio.read(2), None)
820 self.assertEqual(rawio.read(2), b"")
821
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400822 def test_types_have_dict(self):
823 test = (
824 self.IOBase(),
825 self.RawIOBase(),
826 self.TextIOBase(),
827 self.StringIO(),
828 self.BytesIO()
829 )
830 for obj in test:
831 self.assertTrue(hasattr(obj, "__dict__"))
832
Ross Lagerwall59142db2011-10-31 20:34:46 +0200833 def test_opener(self):
834 with self.open(support.TESTFN, "w") as f:
835 f.write("egg\n")
836 fd = os.open(support.TESTFN, os.O_RDONLY)
837 def opener(path, flags):
838 return fd
839 with self.open("non-existent", "r", opener=opener) as f:
840 self.assertEqual(f.read(), "egg\n")
841
Barry Warsaw480e2852016-06-08 17:47:26 -0400842 def test_bad_opener_negative_1(self):
843 # Issue #27066.
844 def badopener(fname, flags):
845 return -1
846 with self.assertRaises(ValueError) as cm:
847 open('non-existent', 'r', opener=badopener)
848 self.assertEqual(str(cm.exception), 'opener returned -1')
849
850 def test_bad_opener_other_negative(self):
851 # Issue #27066.
852 def badopener(fname, flags):
853 return -2
854 with self.assertRaises(ValueError) as cm:
855 open('non-existent', 'r', opener=badopener)
856 self.assertEqual(str(cm.exception), 'opener returned -2')
857
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200858 def test_fileio_closefd(self):
859 # Issue #4841
860 with self.open(__file__, 'rb') as f1, \
861 self.open(__file__, 'rb') as f2:
862 fileio = self.FileIO(f1.fileno(), closefd=False)
863 # .__init__() must not close f1
864 fileio.__init__(f2.fileno(), closefd=False)
865 f1.readline()
866 # .close() must not close f2
867 fileio.close()
868 f2.readline()
869
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300870 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200871 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300872 with self.assertRaises(ValueError):
873 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300874
875 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200876 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300877 with self.assertRaises(ValueError):
878 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300879
Martin Panter6bb91f32016-05-28 00:41:57 +0000880 def test_buffered_readinto_mixin(self):
881 # Test the implementation provided by BufferedIOBase
882 class Stream(self.BufferedIOBase):
883 def read(self, size):
884 return b"12345"
885 read1 = read
886 stream = Stream()
887 for method in ("readinto", "readinto1"):
888 with self.subTest(method):
889 buffer = byteslike(5)
890 self.assertEqual(getattr(stream, method)(buffer), 5)
891 self.assertEqual(bytes(buffer), b"12345")
892
Ethan Furmand62548a2016-06-04 14:38:43 -0700893 def test_fspath_support(self):
894 class PathLike:
895 def __init__(self, path):
896 self.path = path
897
898 def __fspath__(self):
899 return self.path
900
901 def check_path_succeeds(path):
902 with self.open(path, "w") as f:
903 f.write("egg\n")
904
905 with self.open(path, "r") as f:
906 self.assertEqual(f.read(), "egg\n")
907
908 check_path_succeeds(PathLike(support.TESTFN))
909 check_path_succeeds(PathLike(support.TESTFN.encode('utf-8')))
910
911 bad_path = PathLike(TypeError)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700912 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700913 self.open(bad_path, 'w')
914
915 # ensure that refcounting is correct with some error conditions
916 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
917 self.open(PathLike(support.TESTFN), 'rwxa')
918
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530919 def test_RawIOBase_readall(self):
920 # Exercise the default unlimited RawIOBase.read() and readall()
921 # implementations.
922 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
923 self.assertEqual(rawio.read(), b"abcdefg")
924 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
925 self.assertEqual(rawio.readall(), b"abcdefg")
926
927 def test_BufferedIOBase_readinto(self):
928 # Exercise the default BufferedIOBase.readinto() and readinto1()
929 # implementations (which call read() or read1() internally).
930 class Reader(self.BufferedIOBase):
931 def __init__(self, avail):
932 self.avail = avail
933 def read(self, size):
934 result = self.avail[:size]
935 self.avail = self.avail[size:]
936 return result
937 def read1(self, size):
938 """Returns no more than 5 bytes at once"""
939 return self.read(min(size, 5))
940 tests = (
941 # (test method, total data available, read buffer size, expected
942 # read size)
943 ("readinto", 10, 5, 5),
944 ("readinto", 10, 6, 6), # More than read1() can return
945 ("readinto", 5, 6, 5), # Buffer larger than total available
946 ("readinto", 6, 7, 6),
947 ("readinto", 10, 0, 0), # Empty buffer
948 ("readinto1", 10, 5, 5), # Result limited to single read1() call
949 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return
950 ("readinto1", 5, 6, 5), # Buffer larger than total available
951 ("readinto1", 6, 7, 5),
952 ("readinto1", 10, 0, 0), # Empty buffer
953 )
954 UNUSED_BYTE = 0x81
955 for test in tests:
956 with self.subTest(test):
957 method, avail, request, result = test
958 reader = Reader(bytes(range(avail)))
959 buffer = bytearray((UNUSED_BYTE,) * request)
960 method = getattr(reader, method)
961 self.assertEqual(method(buffer), result)
962 self.assertEqual(len(buffer), request)
963 self.assertSequenceEqual(buffer[:result], range(result))
964 unused = (UNUSED_BYTE,) * (request - result)
965 self.assertSequenceEqual(buffer[result:], unused)
966 self.assertEqual(len(reader.avail), avail - result)
967
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200968
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000969class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200970
971 def test_IOBase_finalize(self):
972 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
973 # class which inherits IOBase and an object of this class are caught
974 # in a reference cycle and close() is already in the method cache.
975 class MyIO(self.IOBase):
976 def close(self):
977 pass
978
979 # create an instance to populate the method cache
980 MyIO()
981 obj = MyIO()
982 obj.obj = obj
983 wr = weakref.ref(obj)
984 del MyIO
985 del obj
986 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300987 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000988
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000989class PyIOTest(IOTest):
990 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000991
Guido van Rossuma9e20242007-03-08 00:43:48 +0000992
Gregory P. Smith1bef9072015-04-14 13:24:34 -0700993@support.cpython_only
994class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -0700995
Gregory P. Smith054b0652015-04-14 12:58:05 -0700996 def test_RawIOBase_io_in_pyio_match(self):
997 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +0200998 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
999 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -07001000 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1001
1002 def test_RawIOBase_pyio_in_io_match(self):
1003 """Test that c RawIOBase class has all pyio RawIOBase methods"""
1004 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1005 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1006
1007
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001008class CommonBufferedTests:
1009 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1010
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001011 def test_detach(self):
1012 raw = self.MockRawIO()
1013 buf = self.tp(raw)
1014 self.assertIs(buf.detach(), raw)
1015 self.assertRaises(ValueError, buf.detach)
1016
Benjamin Peterson10e76b62014-12-21 20:51:50 -06001017 repr(buf) # Should still work
1018
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001019 def test_fileno(self):
1020 rawio = self.MockRawIO()
1021 bufio = self.tp(rawio)
1022
Ezio Melottib3aedd42010-11-20 19:04:17 +00001023 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001024
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001025 def test_invalid_args(self):
1026 rawio = self.MockRawIO()
1027 bufio = self.tp(rawio)
1028 # Invalid whence
1029 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +02001030 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001031
1032 def test_override_destructor(self):
1033 tp = self.tp
1034 record = []
1035 class MyBufferedIO(tp):
1036 def __del__(self):
1037 record.append(1)
1038 try:
1039 f = super().__del__
1040 except AttributeError:
1041 pass
1042 else:
1043 f()
1044 def close(self):
1045 record.append(2)
1046 super().close()
1047 def flush(self):
1048 record.append(3)
1049 super().flush()
1050 rawio = self.MockRawIO()
1051 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001052 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001053 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +00001054 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001055
1056 def test_context_manager(self):
1057 # Test usability as a context manager
1058 rawio = self.MockRawIO()
1059 bufio = self.tp(rawio)
1060 def _with():
1061 with bufio:
1062 pass
1063 _with()
1064 # bufio should now be closed, and using it a second time should raise
1065 # a ValueError.
1066 self.assertRaises(ValueError, _with)
1067
1068 def test_error_through_destructor(self):
1069 # Test that the exception state is not modified by a destructor,
1070 # even if close() fails.
1071 rawio = self.CloseFailureIO()
1072 def f():
1073 self.tp(rawio).xyzzy
1074 with support.captured_output("stderr") as s:
1075 self.assertRaises(AttributeError, f)
1076 s = s.getvalue().strip()
1077 if s:
1078 # The destructor *may* have printed an unraisable error, check it
1079 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001080 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001081 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +00001082
Antoine Pitrou716c4442009-05-23 19:04:03 +00001083 def test_repr(self):
1084 raw = self.MockRawIO()
1085 b = self.tp(raw)
Serhiy Storchaka521e5862014-07-22 15:00:37 +03001086 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001087 self.assertEqual(repr(b), "<%s>" % clsname)
1088 raw.name = "dummy"
1089 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
1090 raw.name = b"dummy"
1091 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
1092
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02001093 def test_recursive_repr(self):
1094 # Issue #25455
1095 raw = self.MockRawIO()
1096 b = self.tp(raw)
1097 with support.swap_attr(raw, 'name', b):
1098 try:
1099 repr(b) # Should not crash
1100 except RuntimeError:
1101 pass
1102
Antoine Pitrou6be88762010-05-03 16:48:20 +00001103 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001104 # Test that buffered file is closed despite failed flush
1105 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001106 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001107 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001108 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001109 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001110 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001111 raw.flush = bad_flush
1112 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001113 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001114 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001115 self.assertTrue(raw.closed)
1116 self.assertTrue(closed) # flush() called
1117 self.assertFalse(closed[0]) # flush() called before file closed
1118 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001119 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001120
1121 def test_close_error_on_close(self):
1122 raw = self.MockRawIO()
1123 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001124 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001125 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001126 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001127 raw.close = bad_close
1128 b = self.tp(raw)
1129 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001130 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001131 b.close()
1132 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001133 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001134 self.assertEqual(err.exception.__context__.args, ('flush',))
1135 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001136
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001137 def test_nonnormalized_close_error_on_close(self):
1138 # Issue #21677
1139 raw = self.MockRawIO()
1140 def bad_flush():
1141 raise non_existing_flush
1142 def bad_close():
1143 raise non_existing_close
1144 raw.close = bad_close
1145 b = self.tp(raw)
1146 b.flush = bad_flush
1147 with self.assertRaises(NameError) as err: # exception not swallowed
1148 b.close()
1149 self.assertIn('non_existing_close', str(err.exception))
1150 self.assertIsInstance(err.exception.__context__, NameError)
1151 self.assertIn('non_existing_flush', str(err.exception.__context__))
1152 self.assertFalse(b.closed)
1153
Antoine Pitrou6be88762010-05-03 16:48:20 +00001154 def test_multi_close(self):
1155 raw = self.MockRawIO()
1156 b = self.tp(raw)
1157 b.close()
1158 b.close()
1159 b.close()
1160 self.assertRaises(ValueError, b.flush)
1161
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001162 def test_unseekable(self):
1163 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1164 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1165 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1166
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001167 def test_readonly_attributes(self):
1168 raw = self.MockRawIO()
1169 buf = self.tp(raw)
1170 x = self.MockRawIO()
1171 with self.assertRaises(AttributeError):
1172 buf.raw = x
1173
Guido van Rossum78892e42007-04-06 17:31:18 +00001174
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001175class SizeofTest:
1176
1177 @support.cpython_only
1178 def test_sizeof(self):
1179 bufsize1 = 4096
1180 bufsize2 = 8192
1181 rawio = self.MockRawIO()
1182 bufio = self.tp(rawio, buffer_size=bufsize1)
1183 size = sys.getsizeof(bufio) - bufsize1
1184 rawio = self.MockRawIO()
1185 bufio = self.tp(rawio, buffer_size=bufsize2)
1186 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1187
Jesus Ceadc469452012-10-04 12:37:56 +02001188 @support.cpython_only
1189 def test_buffer_freeing(self) :
1190 bufsize = 4096
1191 rawio = self.MockRawIO()
1192 bufio = self.tp(rawio, buffer_size=bufsize)
1193 size = sys.getsizeof(bufio) - bufsize
1194 bufio.close()
1195 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001196
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001197class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1198 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001200 def test_constructor(self):
1201 rawio = self.MockRawIO([b"abc"])
1202 bufio = self.tp(rawio)
1203 bufio.__init__(rawio)
1204 bufio.__init__(rawio, buffer_size=1024)
1205 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001206 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001207 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1208 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1209 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1210 rawio = self.MockRawIO([b"abc"])
1211 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001212 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001213
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001214 def test_uninitialized(self):
1215 bufio = self.tp.__new__(self.tp)
1216 del bufio
1217 bufio = self.tp.__new__(self.tp)
1218 self.assertRaisesRegex((ValueError, AttributeError),
1219 'uninitialized|has no attribute',
1220 bufio.read, 0)
1221 bufio.__init__(self.MockRawIO())
1222 self.assertEqual(bufio.read(0), b'')
1223
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001224 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001225 for arg in (None, 7):
1226 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1227 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001228 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001229 # Invalid args
1230 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001231
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001232 def test_read1(self):
1233 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1234 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001235 self.assertEqual(b"a", bufio.read(1))
1236 self.assertEqual(b"b", bufio.read1(1))
1237 self.assertEqual(rawio._reads, 1)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001238 self.assertEqual(b"", bufio.read1(0))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001239 self.assertEqual(b"c", bufio.read1(100))
1240 self.assertEqual(rawio._reads, 1)
1241 self.assertEqual(b"d", bufio.read1(100))
1242 self.assertEqual(rawio._reads, 2)
1243 self.assertEqual(b"efg", bufio.read1(100))
1244 self.assertEqual(rawio._reads, 3)
1245 self.assertEqual(b"", bufio.read1(100))
1246 self.assertEqual(rawio._reads, 4)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001247
1248 def test_read1_arbitrary(self):
1249 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1250 bufio = self.tp(rawio)
1251 self.assertEqual(b"a", bufio.read(1))
1252 self.assertEqual(b"bc", bufio.read1())
1253 self.assertEqual(b"d", bufio.read1())
1254 self.assertEqual(b"efg", bufio.read1(-1))
1255 self.assertEqual(rawio._reads, 3)
1256 self.assertEqual(b"", bufio.read1())
1257 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001258
1259 def test_readinto(self):
1260 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1261 bufio = self.tp(rawio)
1262 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001263 self.assertEqual(bufio.readinto(b), 2)
1264 self.assertEqual(b, b"ab")
1265 self.assertEqual(bufio.readinto(b), 2)
1266 self.assertEqual(b, b"cd")
1267 self.assertEqual(bufio.readinto(b), 2)
1268 self.assertEqual(b, b"ef")
1269 self.assertEqual(bufio.readinto(b), 1)
1270 self.assertEqual(b, b"gf")
1271 self.assertEqual(bufio.readinto(b), 0)
1272 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001273 rawio = self.MockRawIO((b"abc", None))
1274 bufio = self.tp(rawio)
1275 self.assertEqual(bufio.readinto(b), 2)
1276 self.assertEqual(b, b"ab")
1277 self.assertEqual(bufio.readinto(b), 1)
1278 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001279
Benjamin Petersona96fea02014-06-22 14:17:44 -07001280 def test_readinto1(self):
1281 buffer_size = 10
1282 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1283 bufio = self.tp(rawio, buffer_size=buffer_size)
1284 b = bytearray(2)
1285 self.assertEqual(bufio.peek(3), b'abc')
1286 self.assertEqual(rawio._reads, 1)
1287 self.assertEqual(bufio.readinto1(b), 2)
1288 self.assertEqual(b, b"ab")
1289 self.assertEqual(rawio._reads, 1)
1290 self.assertEqual(bufio.readinto1(b), 1)
1291 self.assertEqual(b[:1], b"c")
1292 self.assertEqual(rawio._reads, 1)
1293 self.assertEqual(bufio.readinto1(b), 2)
1294 self.assertEqual(b, b"de")
1295 self.assertEqual(rawio._reads, 2)
1296 b = bytearray(2*buffer_size)
1297 self.assertEqual(bufio.peek(3), b'fgh')
1298 self.assertEqual(rawio._reads, 3)
1299 self.assertEqual(bufio.readinto1(b), 6)
1300 self.assertEqual(b[:6], b"fghjkl")
1301 self.assertEqual(rawio._reads, 4)
1302
1303 def test_readinto_array(self):
1304 buffer_size = 60
1305 data = b"a" * 26
1306 rawio = self.MockRawIO((data,))
1307 bufio = self.tp(rawio, buffer_size=buffer_size)
1308
1309 # Create an array with element size > 1 byte
1310 b = array.array('i', b'x' * 32)
1311 assert len(b) != 16
1312
1313 # Read into it. We should get as many *bytes* as we can fit into b
1314 # (which is more than the number of elements)
1315 n = bufio.readinto(b)
1316 self.assertGreater(n, len(b))
1317
1318 # Check that old contents of b are preserved
1319 bm = memoryview(b).cast('B')
1320 self.assertLess(n, len(bm))
1321 self.assertEqual(bm[:n], data[:n])
1322 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1323
1324 def test_readinto1_array(self):
1325 buffer_size = 60
1326 data = b"a" * 26
1327 rawio = self.MockRawIO((data,))
1328 bufio = self.tp(rawio, buffer_size=buffer_size)
1329
1330 # Create an array with element size > 1 byte
1331 b = array.array('i', b'x' * 32)
1332 assert len(b) != 16
1333
1334 # Read into it. We should get as many *bytes* as we can fit into b
1335 # (which is more than the number of elements)
1336 n = bufio.readinto1(b)
1337 self.assertGreater(n, len(b))
1338
1339 # Check that old contents of b are preserved
1340 bm = memoryview(b).cast('B')
1341 self.assertLess(n, len(bm))
1342 self.assertEqual(bm[:n], data[:n])
1343 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1344
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001345 def test_readlines(self):
1346 def bufio():
1347 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1348 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001349 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1350 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1351 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001352
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001353 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001354 data = b"abcdefghi"
1355 dlen = len(data)
1356
1357 tests = [
1358 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1359 [ 100, [ 3, 3, 3], [ dlen ] ],
1360 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1361 ]
1362
1363 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001364 rawio = self.MockFileIO(data)
1365 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001366 pos = 0
1367 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001368 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001369 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001370 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001371 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001372
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001373 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001374 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001375 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1376 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001377 self.assertEqual(b"abcd", bufio.read(6))
1378 self.assertEqual(b"e", bufio.read(1))
1379 self.assertEqual(b"fg", bufio.read())
1380 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001381 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001382 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001383
Victor Stinnera80987f2011-05-25 22:47:16 +02001384 rawio = self.MockRawIO((b"a", None, None))
1385 self.assertEqual(b"a", rawio.readall())
1386 self.assertIsNone(rawio.readall())
1387
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001388 def test_read_past_eof(self):
1389 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1390 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001391
Ezio Melottib3aedd42010-11-20 19:04:17 +00001392 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001393
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001394 def test_read_all(self):
1395 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1396 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001397
Ezio Melottib3aedd42010-11-20 19:04:17 +00001398 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001399
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001400 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001401 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001402 try:
1403 # Write out many bytes with exactly the same number of 0's,
1404 # 1's... 255's. This will help us check that concurrent reading
1405 # doesn't duplicate or forget contents.
1406 N = 1000
1407 l = list(range(256)) * N
1408 random.shuffle(l)
1409 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001410 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001411 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001412 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001413 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001414 errors = []
1415 results = []
1416 def f():
1417 try:
1418 # Intra-buffer read then buffer-flushing read
1419 for n in cycle([1, 19]):
1420 s = bufio.read(n)
1421 if not s:
1422 break
1423 # list.append() is atomic
1424 results.append(s)
1425 except Exception as e:
1426 errors.append(e)
1427 raise
1428 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001429 with support.start_threads(threads):
1430 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001431 self.assertFalse(errors,
1432 "the following exceptions were caught: %r" % errors)
1433 s = b''.join(results)
1434 for i in range(256):
1435 c = bytes(bytearray([i]))
1436 self.assertEqual(s.count(c), N)
1437 finally:
1438 support.unlink(support.TESTFN)
1439
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001440 def test_unseekable(self):
1441 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1442 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1443 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1444 bufio.read(1)
1445 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1446 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1447
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001448 def test_misbehaved_io(self):
1449 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1450 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001451 self.assertRaises(OSError, bufio.seek, 0)
1452 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001453
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001454 def test_no_extraneous_read(self):
1455 # Issue #9550; when the raw IO object has satisfied the read request,
1456 # we should not issue any additional reads, otherwise it may block
1457 # (e.g. socket).
1458 bufsize = 16
1459 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1460 rawio = self.MockRawIO([b"x" * n])
1461 bufio = self.tp(rawio, bufsize)
1462 self.assertEqual(bufio.read(n), b"x" * n)
1463 # Simple case: one raw read is enough to satisfy the request.
1464 self.assertEqual(rawio._extraneous_reads, 0,
1465 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1466 # A more complex case where two raw reads are needed to satisfy
1467 # the request.
1468 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1469 bufio = self.tp(rawio, bufsize)
1470 self.assertEqual(bufio.read(n), b"x" * n)
1471 self.assertEqual(rawio._extraneous_reads, 0,
1472 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1473
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001474 def test_read_on_closed(self):
1475 # Issue #23796
1476 b = io.BufferedReader(io.BytesIO(b"12"))
1477 b.read(1)
1478 b.close()
1479 self.assertRaises(ValueError, b.peek)
1480 self.assertRaises(ValueError, b.read1, 1)
1481
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001482
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001483class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001484 tp = io.BufferedReader
1485
1486 def test_constructor(self):
1487 BufferedReaderTest.test_constructor(self)
1488 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001489 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001490 if sys.maxsize > 0x7FFFFFFF:
1491 rawio = self.MockRawIO()
1492 bufio = self.tp(rawio)
1493 self.assertRaises((OverflowError, MemoryError, ValueError),
1494 bufio.__init__, rawio, sys.maxsize)
1495
1496 def test_initialization(self):
1497 rawio = self.MockRawIO([b"abc"])
1498 bufio = self.tp(rawio)
1499 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1500 self.assertRaises(ValueError, bufio.read)
1501 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1502 self.assertRaises(ValueError, bufio.read)
1503 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1504 self.assertRaises(ValueError, bufio.read)
1505
1506 def test_misbehaved_io_read(self):
1507 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1508 bufio = self.tp(rawio)
1509 # _pyio.BufferedReader seems to implement reading different, so that
1510 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001511 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001512
1513 def test_garbage_collection(self):
1514 # C BufferedReader objects are collected.
1515 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001516 with support.check_warnings(('', ResourceWarning)):
1517 rawio = self.FileIO(support.TESTFN, "w+b")
1518 f = self.tp(rawio)
1519 f.f = f
1520 wr = weakref.ref(f)
1521 del f
1522 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001523 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001524
R David Murray67bfe802013-02-23 21:51:05 -05001525 def test_args_error(self):
1526 # Issue #17275
1527 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1528 self.tp(io.BytesIO(), 1024, 1024, 1024)
1529
1530
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001531class PyBufferedReaderTest(BufferedReaderTest):
1532 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001533
Guido van Rossuma9e20242007-03-08 00:43:48 +00001534
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001535class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1536 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001537
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001538 def test_constructor(self):
1539 rawio = self.MockRawIO()
1540 bufio = self.tp(rawio)
1541 bufio.__init__(rawio)
1542 bufio.__init__(rawio, buffer_size=1024)
1543 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001544 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001545 bufio.flush()
1546 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1547 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1548 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1549 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001550 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001551 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001552 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001553
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001554 def test_uninitialized(self):
1555 bufio = self.tp.__new__(self.tp)
1556 del bufio
1557 bufio = self.tp.__new__(self.tp)
1558 self.assertRaisesRegex((ValueError, AttributeError),
1559 'uninitialized|has no attribute',
1560 bufio.write, b'')
1561 bufio.__init__(self.MockRawIO())
1562 self.assertEqual(bufio.write(b''), 0)
1563
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001564 def test_detach_flush(self):
1565 raw = self.MockRawIO()
1566 buf = self.tp(raw)
1567 buf.write(b"howdy!")
1568 self.assertFalse(raw._write_stack)
1569 buf.detach()
1570 self.assertEqual(raw._write_stack, [b"howdy!"])
1571
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001572 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001573 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001574 writer = self.MockRawIO()
1575 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001576 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001577 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001578 buffer = bytearray(b"def")
1579 bufio.write(buffer)
1580 buffer[:] = b"***" # Overwrite our copy of the data
1581 bufio.flush()
1582 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001583
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001584 def test_write_overflow(self):
1585 writer = self.MockRawIO()
1586 bufio = self.tp(writer, 8)
1587 contents = b"abcdefghijklmnop"
1588 for n in range(0, len(contents), 3):
1589 bufio.write(contents[n:n+3])
1590 flushed = b"".join(writer._write_stack)
1591 # At least (total - 8) bytes were implicitly flushed, perhaps more
1592 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001593 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001594
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001595 def check_writes(self, intermediate_func):
1596 # Lots of writes, test the flushed output is as expected.
1597 contents = bytes(range(256)) * 1000
1598 n = 0
1599 writer = self.MockRawIO()
1600 bufio = self.tp(writer, 13)
1601 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1602 def gen_sizes():
1603 for size in count(1):
1604 for i in range(15):
1605 yield size
1606 sizes = gen_sizes()
1607 while n < len(contents):
1608 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001609 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001610 intermediate_func(bufio)
1611 n += size
1612 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001613 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001614
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001615 def test_writes(self):
1616 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001617
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001618 def test_writes_and_flushes(self):
1619 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001620
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001621 def test_writes_and_seeks(self):
1622 def _seekabs(bufio):
1623 pos = bufio.tell()
1624 bufio.seek(pos + 1, 0)
1625 bufio.seek(pos - 1, 0)
1626 bufio.seek(pos, 0)
1627 self.check_writes(_seekabs)
1628 def _seekrel(bufio):
1629 pos = bufio.seek(0, 1)
1630 bufio.seek(+1, 1)
1631 bufio.seek(-1, 1)
1632 bufio.seek(pos, 0)
1633 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001634
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001635 def test_writes_and_truncates(self):
1636 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001637
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001638 def test_write_non_blocking(self):
1639 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001640 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001641
Ezio Melottib3aedd42010-11-20 19:04:17 +00001642 self.assertEqual(bufio.write(b"abcd"), 4)
1643 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001644 # 1 byte will be written, the rest will be buffered
1645 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001646 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001647
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001648 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1649 raw.block_on(b"0")
1650 try:
1651 bufio.write(b"opqrwxyz0123456789")
1652 except self.BlockingIOError as e:
1653 written = e.characters_written
1654 else:
1655 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001656 self.assertEqual(written, 16)
1657 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001658 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001659
Ezio Melottib3aedd42010-11-20 19:04:17 +00001660 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001661 s = raw.pop_written()
1662 # Previously buffered bytes were flushed
1663 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001664
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001665 def test_write_and_rewind(self):
1666 raw = io.BytesIO()
1667 bufio = self.tp(raw, 4)
1668 self.assertEqual(bufio.write(b"abcdef"), 6)
1669 self.assertEqual(bufio.tell(), 6)
1670 bufio.seek(0, 0)
1671 self.assertEqual(bufio.write(b"XY"), 2)
1672 bufio.seek(6, 0)
1673 self.assertEqual(raw.getvalue(), b"XYcdef")
1674 self.assertEqual(bufio.write(b"123456"), 6)
1675 bufio.flush()
1676 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001677
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001678 def test_flush(self):
1679 writer = self.MockRawIO()
1680 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001681 bufio.write(b"abc")
1682 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001683 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001684
Antoine Pitrou131a4892012-10-16 22:57:11 +02001685 def test_writelines(self):
1686 l = [b'ab', b'cd', b'ef']
1687 writer = self.MockRawIO()
1688 bufio = self.tp(writer, 8)
1689 bufio.writelines(l)
1690 bufio.flush()
1691 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1692
1693 def test_writelines_userlist(self):
1694 l = UserList([b'ab', b'cd', b'ef'])
1695 writer = self.MockRawIO()
1696 bufio = self.tp(writer, 8)
1697 bufio.writelines(l)
1698 bufio.flush()
1699 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1700
1701 def test_writelines_error(self):
1702 writer = self.MockRawIO()
1703 bufio = self.tp(writer, 8)
1704 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1705 self.assertRaises(TypeError, bufio.writelines, None)
1706 self.assertRaises(TypeError, bufio.writelines, 'abc')
1707
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001708 def test_destructor(self):
1709 writer = self.MockRawIO()
1710 bufio = self.tp(writer, 8)
1711 bufio.write(b"abc")
1712 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001713 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001714 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001715
1716 def test_truncate(self):
1717 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001718 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001719 bufio = self.tp(raw, 8)
1720 bufio.write(b"abcdef")
1721 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001722 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001723 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001724 self.assertEqual(f.read(), b"abc")
1725
Nitish Chandra059f58c2018-01-28 21:30:09 +05301726 def test_truncate_after_write(self):
1727 # Ensure that truncate preserves the file position after
1728 # writes longer than the buffer size.
1729 # Issue: https://bugs.python.org/issue32228
1730 with self.open(support.TESTFN, "wb") as f:
1731 # Fill with some buffer
1732 f.write(b'\x00' * 10000)
1733 buffer_sizes = [8192, 4096, 200]
1734 for buffer_size in buffer_sizes:
1735 with self.open(support.TESTFN, "r+b", buffering=buffer_size) as f:
1736 f.write(b'\x00' * (buffer_size + 1))
1737 # After write write_pos and write_end are set to 0
1738 f.read(1)
1739 # read operation makes sure that pos != raw_pos
1740 f.truncate()
1741 self.assertEqual(f.tell(), buffer_size + 2)
1742
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001743 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001744 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001745 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001746 # Write out many bytes from many threads and test they were
1747 # all flushed.
1748 N = 1000
1749 contents = bytes(range(256)) * N
1750 sizes = cycle([1, 19])
1751 n = 0
1752 queue = deque()
1753 while n < len(contents):
1754 size = next(sizes)
1755 queue.append(contents[n:n+size])
1756 n += size
1757 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001758 # We use a real file object because it allows us to
1759 # exercise situations where the GIL is released before
1760 # writing the buffer to the raw streams. This is in addition
1761 # to concurrency issues due to switching threads in the middle
1762 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001763 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001764 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001765 errors = []
1766 def f():
1767 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001768 while True:
1769 try:
1770 s = queue.popleft()
1771 except IndexError:
1772 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001773 bufio.write(s)
1774 except Exception as e:
1775 errors.append(e)
1776 raise
1777 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001778 with support.start_threads(threads):
1779 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001780 self.assertFalse(errors,
1781 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001782 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001783 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001784 s = f.read()
1785 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001786 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001787 finally:
1788 support.unlink(support.TESTFN)
1789
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001790 def test_misbehaved_io(self):
1791 rawio = self.MisbehavedRawIO()
1792 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001793 self.assertRaises(OSError, bufio.seek, 0)
1794 self.assertRaises(OSError, bufio.tell)
1795 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001796
Florent Xicluna109d5732012-07-07 17:03:22 +02001797 def test_max_buffer_size_removal(self):
1798 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001799 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001800
Benjamin Peterson68623612012-12-20 11:53:11 -06001801 def test_write_error_on_close(self):
1802 raw = self.MockRawIO()
1803 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001804 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001805 raw.write = bad_write
1806 b = self.tp(raw)
1807 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001808 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001809 self.assertTrue(b.closed)
1810
benfogle9703f092017-11-10 16:03:40 -05001811 def test_slow_close_from_thread(self):
1812 # Issue #31976
1813 rawio = self.SlowFlushRawIO()
1814 bufio = self.tp(rawio, 8)
1815 t = threading.Thread(target=bufio.close)
1816 t.start()
1817 rawio.in_flush.wait()
1818 self.assertRaises(ValueError, bufio.write, b'spam')
1819 self.assertTrue(bufio.closed)
1820 t.join()
1821
1822
Benjamin Peterson59406a92009-03-26 17:10:29 +00001823
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001824class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001825 tp = io.BufferedWriter
1826
1827 def test_constructor(self):
1828 BufferedWriterTest.test_constructor(self)
1829 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001830 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001831 if sys.maxsize > 0x7FFFFFFF:
1832 rawio = self.MockRawIO()
1833 bufio = self.tp(rawio)
1834 self.assertRaises((OverflowError, MemoryError, ValueError),
1835 bufio.__init__, rawio, sys.maxsize)
1836
1837 def test_initialization(self):
1838 rawio = self.MockRawIO()
1839 bufio = self.tp(rawio)
1840 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1841 self.assertRaises(ValueError, bufio.write, b"def")
1842 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1843 self.assertRaises(ValueError, bufio.write, b"def")
1844 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1845 self.assertRaises(ValueError, bufio.write, b"def")
1846
1847 def test_garbage_collection(self):
1848 # C BufferedWriter objects are collected, and collecting them flushes
1849 # all data to disk.
1850 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001851 with support.check_warnings(('', ResourceWarning)):
1852 rawio = self.FileIO(support.TESTFN, "w+b")
1853 f = self.tp(rawio)
1854 f.write(b"123xxx")
1855 f.x = f
1856 wr = weakref.ref(f)
1857 del f
1858 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001859 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001860 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001861 self.assertEqual(f.read(), b"123xxx")
1862
R David Murray67bfe802013-02-23 21:51:05 -05001863 def test_args_error(self):
1864 # Issue #17275
1865 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1866 self.tp(io.BytesIO(), 1024, 1024, 1024)
1867
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001868
1869class PyBufferedWriterTest(BufferedWriterTest):
1870 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001871
Guido van Rossum01a27522007-03-07 01:00:12 +00001872class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001873
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001874 def test_constructor(self):
1875 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001876 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001877
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001878 def test_uninitialized(self):
1879 pair = self.tp.__new__(self.tp)
1880 del pair
1881 pair = self.tp.__new__(self.tp)
1882 self.assertRaisesRegex((ValueError, AttributeError),
1883 'uninitialized|has no attribute',
1884 pair.read, 0)
1885 self.assertRaisesRegex((ValueError, AttributeError),
1886 'uninitialized|has no attribute',
1887 pair.write, b'')
1888 pair.__init__(self.MockRawIO(), self.MockRawIO())
1889 self.assertEqual(pair.read(0), b'')
1890 self.assertEqual(pair.write(b''), 0)
1891
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001892 def test_detach(self):
1893 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1894 self.assertRaises(self.UnsupportedOperation, pair.detach)
1895
Florent Xicluna109d5732012-07-07 17:03:22 +02001896 def test_constructor_max_buffer_size_removal(self):
1897 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001898 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001899
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001900 def test_constructor_with_not_readable(self):
1901 class NotReadable(MockRawIO):
1902 def readable(self):
1903 return False
1904
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001905 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001906
1907 def test_constructor_with_not_writeable(self):
1908 class NotWriteable(MockRawIO):
1909 def writable(self):
1910 return False
1911
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001912 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001913
1914 def test_read(self):
1915 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1916
1917 self.assertEqual(pair.read(3), b"abc")
1918 self.assertEqual(pair.read(1), b"d")
1919 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001920 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1921 self.assertEqual(pair.read(None), b"abc")
1922
1923 def test_readlines(self):
1924 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1925 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1926 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1927 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001928
1929 def test_read1(self):
1930 # .read1() is delegated to the underlying reader object, so this test
1931 # can be shallow.
1932 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1933
1934 self.assertEqual(pair.read1(3), b"abc")
Martin Panterccb2c0e2016-10-20 23:48:14 +00001935 self.assertEqual(pair.read1(), b"def")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001936
1937 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00001938 for method in ("readinto", "readinto1"):
1939 with self.subTest(method):
1940 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001941
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03001942 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00001943 self.assertEqual(getattr(pair, method)(data), 5)
1944 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001945
1946 def test_write(self):
1947 w = self.MockRawIO()
1948 pair = self.tp(self.MockRawIO(), w)
1949
1950 pair.write(b"abc")
1951 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00001952 buffer = bytearray(b"def")
1953 pair.write(buffer)
1954 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001955 pair.flush()
1956 self.assertEqual(w._write_stack, [b"abc", b"def"])
1957
1958 def test_peek(self):
1959 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1960
1961 self.assertTrue(pair.peek(3).startswith(b"abc"))
1962 self.assertEqual(pair.read(3), b"abc")
1963
1964 def test_readable(self):
1965 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1966 self.assertTrue(pair.readable())
1967
1968 def test_writeable(self):
1969 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1970 self.assertTrue(pair.writable())
1971
1972 def test_seekable(self):
1973 # BufferedRWPairs are never seekable, even if their readers and writers
1974 # are.
1975 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1976 self.assertFalse(pair.seekable())
1977
1978 # .flush() is delegated to the underlying writer object and has been
1979 # tested in the test_write method.
1980
1981 def test_close_and_closed(self):
1982 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1983 self.assertFalse(pair.closed)
1984 pair.close()
1985 self.assertTrue(pair.closed)
1986
Serhiy Storchaka7665be62015-03-24 23:21:57 +02001987 def test_reader_close_error_on_close(self):
1988 def reader_close():
1989 reader_non_existing
1990 reader = self.MockRawIO()
1991 reader.close = reader_close
1992 writer = self.MockRawIO()
1993 pair = self.tp(reader, writer)
1994 with self.assertRaises(NameError) as err:
1995 pair.close()
1996 self.assertIn('reader_non_existing', str(err.exception))
1997 self.assertTrue(pair.closed)
1998 self.assertFalse(reader.closed)
1999 self.assertTrue(writer.closed)
2000
2001 def test_writer_close_error_on_close(self):
2002 def writer_close():
2003 writer_non_existing
2004 reader = self.MockRawIO()
2005 writer = self.MockRawIO()
2006 writer.close = writer_close
2007 pair = self.tp(reader, writer)
2008 with self.assertRaises(NameError) as err:
2009 pair.close()
2010 self.assertIn('writer_non_existing', str(err.exception))
2011 self.assertFalse(pair.closed)
2012 self.assertTrue(reader.closed)
2013 self.assertFalse(writer.closed)
2014
2015 def test_reader_writer_close_error_on_close(self):
2016 def reader_close():
2017 reader_non_existing
2018 def writer_close():
2019 writer_non_existing
2020 reader = self.MockRawIO()
2021 reader.close = reader_close
2022 writer = self.MockRawIO()
2023 writer.close = writer_close
2024 pair = self.tp(reader, writer)
2025 with self.assertRaises(NameError) as err:
2026 pair.close()
2027 self.assertIn('reader_non_existing', str(err.exception))
2028 self.assertIsInstance(err.exception.__context__, NameError)
2029 self.assertIn('writer_non_existing', str(err.exception.__context__))
2030 self.assertFalse(pair.closed)
2031 self.assertFalse(reader.closed)
2032 self.assertFalse(writer.closed)
2033
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002034 def test_isatty(self):
2035 class SelectableIsAtty(MockRawIO):
2036 def __init__(self, isatty):
2037 MockRawIO.__init__(self)
2038 self._isatty = isatty
2039
2040 def isatty(self):
2041 return self._isatty
2042
2043 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2044 self.assertFalse(pair.isatty())
2045
2046 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2047 self.assertTrue(pair.isatty())
2048
2049 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2050 self.assertTrue(pair.isatty())
2051
2052 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2053 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00002054
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04002055 def test_weakref_clearing(self):
2056 brw = self.tp(self.MockRawIO(), self.MockRawIO())
2057 ref = weakref.ref(brw)
2058 brw = None
2059 ref = None # Shouldn't segfault.
2060
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002061class CBufferedRWPairTest(BufferedRWPairTest):
2062 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002063
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002064class PyBufferedRWPairTest(BufferedRWPairTest):
2065 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002066
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002067
2068class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2069 read_mode = "rb+"
2070 write_mode = "wb+"
2071
2072 def test_constructor(self):
2073 BufferedReaderTest.test_constructor(self)
2074 BufferedWriterTest.test_constructor(self)
2075
Serhiy Storchaka61e24932014-02-12 10:52:35 +02002076 def test_uninitialized(self):
2077 BufferedReaderTest.test_uninitialized(self)
2078 BufferedWriterTest.test_uninitialized(self)
2079
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002080 def test_read_and_write(self):
2081 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00002082 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00002083
2084 self.assertEqual(b"as", rw.read(2))
2085 rw.write(b"ddd")
2086 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002087 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002088 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002089 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00002090
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002091 def test_seek_and_tell(self):
2092 raw = self.BytesIO(b"asdfghjkl")
2093 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00002094
Ezio Melottib3aedd42010-11-20 19:04:17 +00002095 self.assertEqual(b"as", rw.read(2))
2096 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002097 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002098 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00002099
Antoine Pitroue05565e2011-08-20 14:39:23 +02002100 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00002101 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02002102 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002103 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002104 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002105 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002106 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002107 self.assertEqual(7, rw.tell())
2108 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02002109 rw.flush()
2110 self.assertEqual(b"asdf123fl", raw.getvalue())
2111
Christian Heimes8e42a0a2007-11-08 18:04:45 +00002112 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002113
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002114 def check_flush_and_read(self, read_func):
2115 raw = self.BytesIO(b"abcdefghi")
2116 bufio = self.tp(raw)
2117
Ezio Melottib3aedd42010-11-20 19:04:17 +00002118 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002119 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002120 self.assertEqual(b"ef", read_func(bufio, 2))
2121 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002122 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002123 self.assertEqual(6, bufio.tell())
2124 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002125 raw.seek(0, 0)
2126 raw.write(b"XYZ")
2127 # flush() resets the read buffer
2128 bufio.flush()
2129 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002130 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002131
2132 def test_flush_and_read(self):
2133 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2134
2135 def test_flush_and_readinto(self):
2136 def _readinto(bufio, n=-1):
2137 b = bytearray(n if n >= 0 else 9999)
2138 n = bufio.readinto(b)
2139 return bytes(b[:n])
2140 self.check_flush_and_read(_readinto)
2141
2142 def test_flush_and_peek(self):
2143 def _peek(bufio, n=-1):
2144 # This relies on the fact that the buffer can contain the whole
2145 # raw stream, otherwise peek() can return less.
2146 b = bufio.peek(n)
2147 if n != -1:
2148 b = b[:n]
2149 bufio.seek(len(b), 1)
2150 return b
2151 self.check_flush_and_read(_peek)
2152
2153 def test_flush_and_write(self):
2154 raw = self.BytesIO(b"abcdefghi")
2155 bufio = self.tp(raw)
2156
2157 bufio.write(b"123")
2158 bufio.flush()
2159 bufio.write(b"45")
2160 bufio.flush()
2161 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002162 self.assertEqual(b"12345fghi", raw.getvalue())
2163 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002164
2165 def test_threads(self):
2166 BufferedReaderTest.test_threads(self)
2167 BufferedWriterTest.test_threads(self)
2168
2169 def test_writes_and_peek(self):
2170 def _peek(bufio):
2171 bufio.peek(1)
2172 self.check_writes(_peek)
2173 def _peek(bufio):
2174 pos = bufio.tell()
2175 bufio.seek(-1, 1)
2176 bufio.peek(1)
2177 bufio.seek(pos, 0)
2178 self.check_writes(_peek)
2179
2180 def test_writes_and_reads(self):
2181 def _read(bufio):
2182 bufio.seek(-1, 1)
2183 bufio.read(1)
2184 self.check_writes(_read)
2185
2186 def test_writes_and_read1s(self):
2187 def _read1(bufio):
2188 bufio.seek(-1, 1)
2189 bufio.read1(1)
2190 self.check_writes(_read1)
2191
2192 def test_writes_and_readintos(self):
2193 def _read(bufio):
2194 bufio.seek(-1, 1)
2195 bufio.readinto(bytearray(1))
2196 self.check_writes(_read)
2197
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002198 def test_write_after_readahead(self):
2199 # Issue #6629: writing after the buffer was filled by readahead should
2200 # first rewind the raw stream.
2201 for overwrite_size in [1, 5]:
2202 raw = self.BytesIO(b"A" * 10)
2203 bufio = self.tp(raw, 4)
2204 # Trigger readahead
2205 self.assertEqual(bufio.read(1), b"A")
2206 self.assertEqual(bufio.tell(), 1)
2207 # Overwriting should rewind the raw stream if it needs so
2208 bufio.write(b"B" * overwrite_size)
2209 self.assertEqual(bufio.tell(), overwrite_size + 1)
2210 # If the write size was smaller than the buffer size, flush() and
2211 # check that rewind happens.
2212 bufio.flush()
2213 self.assertEqual(bufio.tell(), overwrite_size + 1)
2214 s = raw.getvalue()
2215 self.assertEqual(s,
2216 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2217
Antoine Pitrou7c404892011-05-13 00:13:33 +02002218 def test_write_rewind_write(self):
2219 # Various combinations of reading / writing / seeking backwards / writing again
2220 def mutate(bufio, pos1, pos2):
2221 assert pos2 >= pos1
2222 # Fill the buffer
2223 bufio.seek(pos1)
2224 bufio.read(pos2 - pos1)
2225 bufio.write(b'\x02')
2226 # This writes earlier than the previous write, but still inside
2227 # the buffer.
2228 bufio.seek(pos1)
2229 bufio.write(b'\x01')
2230
2231 b = b"\x80\x81\x82\x83\x84"
2232 for i in range(0, len(b)):
2233 for j in range(i, len(b)):
2234 raw = self.BytesIO(b)
2235 bufio = self.tp(raw, 100)
2236 mutate(bufio, i, j)
2237 bufio.flush()
2238 expected = bytearray(b)
2239 expected[j] = 2
2240 expected[i] = 1
2241 self.assertEqual(raw.getvalue(), expected,
2242 "failed result for i=%d, j=%d" % (i, j))
2243
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002244 def test_truncate_after_read_or_write(self):
2245 raw = self.BytesIO(b"A" * 10)
2246 bufio = self.tp(raw, 100)
2247 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2248 self.assertEqual(bufio.truncate(), 2)
2249 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2250 self.assertEqual(bufio.truncate(), 4)
2251
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002252 def test_misbehaved_io(self):
2253 BufferedReaderTest.test_misbehaved_io(self)
2254 BufferedWriterTest.test_misbehaved_io(self)
2255
Antoine Pitroue05565e2011-08-20 14:39:23 +02002256 def test_interleaved_read_write(self):
2257 # Test for issue #12213
2258 with self.BytesIO(b'abcdefgh') as raw:
2259 with self.tp(raw, 100) as f:
2260 f.write(b"1")
2261 self.assertEqual(f.read(1), b'b')
2262 f.write(b'2')
2263 self.assertEqual(f.read1(1), b'd')
2264 f.write(b'3')
2265 buf = bytearray(1)
2266 f.readinto(buf)
2267 self.assertEqual(buf, b'f')
2268 f.write(b'4')
2269 self.assertEqual(f.peek(1), b'h')
2270 f.flush()
2271 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2272
2273 with self.BytesIO(b'abc') as raw:
2274 with self.tp(raw, 100) as f:
2275 self.assertEqual(f.read(1), b'a')
2276 f.write(b"2")
2277 self.assertEqual(f.read(1), b'c')
2278 f.flush()
2279 self.assertEqual(raw.getvalue(), b'a2c')
2280
2281 def test_interleaved_readline_write(self):
2282 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2283 with self.tp(raw) as f:
2284 f.write(b'1')
2285 self.assertEqual(f.readline(), b'b\n')
2286 f.write(b'2')
2287 self.assertEqual(f.readline(), b'def\n')
2288 f.write(b'3')
2289 self.assertEqual(f.readline(), b'\n')
2290 f.flush()
2291 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2292
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002293 # You can't construct a BufferedRandom over a non-seekable stream.
2294 test_unseekable = None
2295
R David Murray67bfe802013-02-23 21:51:05 -05002296
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002297class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002298 tp = io.BufferedRandom
2299
2300 def test_constructor(self):
2301 BufferedRandomTest.test_constructor(self)
2302 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08002303 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002304 if sys.maxsize > 0x7FFFFFFF:
2305 rawio = self.MockRawIO()
2306 bufio = self.tp(rawio)
2307 self.assertRaises((OverflowError, MemoryError, ValueError),
2308 bufio.__init__, rawio, sys.maxsize)
2309
2310 def test_garbage_collection(self):
2311 CBufferedReaderTest.test_garbage_collection(self)
2312 CBufferedWriterTest.test_garbage_collection(self)
2313
R David Murray67bfe802013-02-23 21:51:05 -05002314 def test_args_error(self):
2315 # Issue #17275
2316 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2317 self.tp(io.BytesIO(), 1024, 1024, 1024)
2318
2319
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002320class PyBufferedRandomTest(BufferedRandomTest):
2321 tp = pyio.BufferedRandom
2322
2323
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002324# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2325# properties:
2326# - A single output character can correspond to many bytes of input.
2327# - The number of input bytes to complete the character can be
2328# undetermined until the last input byte is received.
2329# - The number of input bytes can vary depending on previous input.
2330# - A single input byte can correspond to many characters of output.
2331# - The number of output characters can be undetermined until the
2332# last input byte is received.
2333# - The number of output characters can vary depending on previous input.
2334
2335class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2336 """
2337 For testing seek/tell behavior with a stateful, buffering decoder.
2338
2339 Input is a sequence of words. Words may be fixed-length (length set
2340 by input) or variable-length (period-terminated). In variable-length
2341 mode, extra periods are ignored. Possible words are:
2342 - 'i' followed by a number sets the input length, I (maximum 99).
2343 When I is set to 0, words are space-terminated.
2344 - 'o' followed by a number sets the output length, O (maximum 99).
2345 - Any other word is converted into a word followed by a period on
2346 the output. The output word consists of the input word truncated
2347 or padded out with hyphens to make its length equal to O. If O
2348 is 0, the word is output verbatim without truncating or padding.
2349 I and O are initially set to 1. When I changes, any buffered input is
2350 re-scanned according to the new I. EOF also terminates the last word.
2351 """
2352
2353 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002354 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002355 self.reset()
2356
2357 def __repr__(self):
2358 return '<SID %x>' % id(self)
2359
2360 def reset(self):
2361 self.i = 1
2362 self.o = 1
2363 self.buffer = bytearray()
2364
2365 def getstate(self):
2366 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2367 return bytes(self.buffer), i*100 + o
2368
2369 def setstate(self, state):
2370 buffer, io = state
2371 self.buffer = bytearray(buffer)
2372 i, o = divmod(io, 100)
2373 self.i, self.o = i ^ 1, o ^ 1
2374
2375 def decode(self, input, final=False):
2376 output = ''
2377 for b in input:
2378 if self.i == 0: # variable-length, terminated with period
2379 if b == ord('.'):
2380 if self.buffer:
2381 output += self.process_word()
2382 else:
2383 self.buffer.append(b)
2384 else: # fixed-length, terminate after self.i bytes
2385 self.buffer.append(b)
2386 if len(self.buffer) == self.i:
2387 output += self.process_word()
2388 if final and self.buffer: # EOF terminates the last word
2389 output += self.process_word()
2390 return output
2391
2392 def process_word(self):
2393 output = ''
2394 if self.buffer[0] == ord('i'):
2395 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2396 elif self.buffer[0] == ord('o'):
2397 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2398 else:
2399 output = self.buffer.decode('ascii')
2400 if len(output) < self.o:
2401 output += '-'*self.o # pad out with hyphens
2402 if self.o:
2403 output = output[:self.o] # truncate to output length
2404 output += '.'
2405 self.buffer = bytearray()
2406 return output
2407
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002408 codecEnabled = False
2409
2410 @classmethod
2411 def lookupTestDecoder(cls, name):
2412 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002413 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002414 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002415 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002416 incrementalencoder=None,
2417 streamreader=None, streamwriter=None,
2418 incrementaldecoder=cls)
2419
2420# Register the previous decoder for testing.
2421# Disabled by default, tests will enable it.
2422codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2423
2424
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002425class StatefulIncrementalDecoderTest(unittest.TestCase):
2426 """
2427 Make sure the StatefulIncrementalDecoder actually works.
2428 """
2429
2430 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002431 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002432 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002433 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002434 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002435 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002436 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002437 # I=0, O=6 (variable-length input, fixed-length output)
2438 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2439 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002440 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002441 # I=6, O=3 (fixed-length input > fixed-length output)
2442 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2443 # I=0, then 3; O=29, then 15 (with longer output)
2444 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2445 'a----------------------------.' +
2446 'b----------------------------.' +
2447 'cde--------------------------.' +
2448 'abcdefghijabcde.' +
2449 'a.b------------.' +
2450 '.c.------------.' +
2451 'd.e------------.' +
2452 'k--------------.' +
2453 'l--------------.' +
2454 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002455 ]
2456
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002457 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002458 # Try a few one-shot test cases.
2459 for input, eof, output in self.test_cases:
2460 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002461 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002462
2463 # Also test an unfinished decode, followed by forcing EOF.
2464 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002465 self.assertEqual(d.decode(b'oiabcd'), '')
2466 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002467
2468class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002469
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002470 def setUp(self):
2471 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2472 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002473 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002474
Guido van Rossumd0712812007-04-11 16:32:43 +00002475 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002476 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002477
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002478 def test_constructor(self):
2479 r = self.BytesIO(b"\xc3\xa9\n\n")
2480 b = self.BufferedReader(r, 1000)
2481 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002482 t.__init__(b, encoding="latin-1", newline="\r\n")
2483 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002484 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002485 t.__init__(b, encoding="utf-8", line_buffering=True)
2486 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002487 self.assertEqual(t.line_buffering, True)
2488 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002489 self.assertRaises(TypeError, t.__init__, b, newline=42)
2490 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2491
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002492 def test_uninitialized(self):
2493 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2494 del t
2495 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2496 self.assertRaises(Exception, repr, t)
2497 self.assertRaisesRegex((ValueError, AttributeError),
2498 'uninitialized|has no attribute',
2499 t.read, 0)
2500 t.__init__(self.MockRawIO())
2501 self.assertEqual(t.read(0), '')
2502
Nick Coghlana9b15242014-02-04 22:11:18 +10002503 def test_non_text_encoding_codecs_are_rejected(self):
2504 # Ensure the constructor complains if passed a codec that isn't
2505 # marked as a text encoding
2506 # http://bugs.python.org/issue20404
2507 r = self.BytesIO()
2508 b = self.BufferedWriter(r)
2509 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2510 self.TextIOWrapper(b, encoding="hex")
2511
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002512 def test_detach(self):
2513 r = self.BytesIO()
2514 b = self.BufferedWriter(r)
2515 t = self.TextIOWrapper(b)
2516 self.assertIs(t.detach(), b)
2517
2518 t = self.TextIOWrapper(b, encoding="ascii")
2519 t.write("howdy")
2520 self.assertFalse(r.getvalue())
2521 t.detach()
2522 self.assertEqual(r.getvalue(), b"howdy")
2523 self.assertRaises(ValueError, t.detach)
2524
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002525 # Operations independent of the detached stream should still work
2526 repr(t)
2527 self.assertEqual(t.encoding, "ascii")
2528 self.assertEqual(t.errors, "strict")
2529 self.assertFalse(t.line_buffering)
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002530 self.assertFalse(t.write_through)
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002531
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002532 def test_repr(self):
2533 raw = self.BytesIO("hello".encode("utf-8"))
2534 b = self.BufferedReader(raw)
2535 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002536 modname = self.TextIOWrapper.__module__
2537 self.assertEqual(repr(t),
2538 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2539 raw.name = "dummy"
2540 self.assertEqual(repr(t),
2541 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002542 t.mode = "r"
2543 self.assertEqual(repr(t),
2544 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002545 raw.name = b"dummy"
2546 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002547 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002548
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002549 t.buffer.detach()
2550 repr(t) # Should not raise an exception
2551
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002552 def test_recursive_repr(self):
2553 # Issue #25455
2554 raw = self.BytesIO()
2555 t = self.TextIOWrapper(raw)
2556 with support.swap_attr(raw, 'name', t):
2557 try:
2558 repr(t) # Should not crash
2559 except RuntimeError:
2560 pass
2561
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002562 def test_line_buffering(self):
2563 r = self.BytesIO()
2564 b = self.BufferedWriter(r, 1000)
2565 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002566 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002567 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002568 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002569 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002570 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002571 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002572
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002573 def test_reconfigure_line_buffering(self):
2574 r = self.BytesIO()
2575 b = self.BufferedWriter(r, 1000)
2576 t = self.TextIOWrapper(b, newline="\n", line_buffering=False)
2577 t.write("AB\nC")
2578 self.assertEqual(r.getvalue(), b"")
2579
2580 t.reconfigure(line_buffering=True) # implicit flush
2581 self.assertEqual(r.getvalue(), b"AB\nC")
2582 t.write("DEF\nG")
2583 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2584 t.write("H")
2585 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2586 t.reconfigure(line_buffering=False) # implicit flush
2587 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2588 t.write("IJ")
2589 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2590
2591 # Keeping default value
2592 t.reconfigure()
2593 t.reconfigure(line_buffering=None)
2594 self.assertEqual(t.line_buffering, False)
2595 t.reconfigure(line_buffering=True)
2596 t.reconfigure()
2597 t.reconfigure(line_buffering=None)
2598 self.assertEqual(t.line_buffering, True)
2599
Victor Stinner91106cd2017-12-13 12:29:09 +01002600 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002601 def test_default_encoding(self):
2602 old_environ = dict(os.environ)
2603 try:
2604 # try to get a user preferred encoding different than the current
2605 # locale encoding to check that TextIOWrapper() uses the current
2606 # locale encoding and not the user preferred encoding
2607 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2608 if key in os.environ:
2609 del os.environ[key]
2610
2611 current_locale_encoding = locale.getpreferredencoding(False)
2612 b = self.BytesIO()
2613 t = self.TextIOWrapper(b)
2614 self.assertEqual(t.encoding, current_locale_encoding)
2615 finally:
2616 os.environ.clear()
2617 os.environ.update(old_environ)
2618
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002619 @support.cpython_only
Victor Stinner91106cd2017-12-13 12:29:09 +01002620 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Serhiy Storchaka78980432013-01-15 01:12:17 +02002621 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002622 # Issue 15989
2623 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002624 b = self.BytesIO()
2625 b.fileno = lambda: _testcapi.INT_MAX + 1
2626 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2627 b.fileno = lambda: _testcapi.UINT_MAX + 1
2628 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2629
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002630 def test_encoding(self):
2631 # Check the encoding attribute is always set, and valid
2632 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002633 t = self.TextIOWrapper(b, encoding="utf-8")
2634 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002635 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002636 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002637 codecs.lookup(t.encoding)
2638
2639 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002640 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002641 b = self.BytesIO(b"abc\n\xff\n")
2642 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002643 self.assertRaises(UnicodeError, t.read)
2644 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002645 b = self.BytesIO(b"abc\n\xff\n")
2646 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002647 self.assertRaises(UnicodeError, t.read)
2648 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002649 b = self.BytesIO(b"abc\n\xff\n")
2650 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002651 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002652 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002653 b = self.BytesIO(b"abc\n\xff\n")
2654 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002655 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002656
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002657 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002658 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002659 b = self.BytesIO()
2660 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002661 self.assertRaises(UnicodeError, t.write, "\xff")
2662 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002663 b = self.BytesIO()
2664 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002665 self.assertRaises(UnicodeError, t.write, "\xff")
2666 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002667 b = self.BytesIO()
2668 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002669 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002670 t.write("abc\xffdef\n")
2671 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002672 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002673 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002674 b = self.BytesIO()
2675 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002676 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002677 t.write("abc\xffdef\n")
2678 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002679 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002680
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002681 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002682 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2683
2684 tests = [
2685 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002686 [ '', input_lines ],
2687 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2688 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2689 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002690 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002691 encodings = (
2692 'utf-8', 'latin-1',
2693 'utf-16', 'utf-16-le', 'utf-16-be',
2694 'utf-32', 'utf-32-le', 'utf-32-be',
2695 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002696
Guido van Rossum8358db22007-08-18 21:39:55 +00002697 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002698 # character in TextIOWrapper._pending_line.
2699 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002700 # XXX: str.encode() should return bytes
2701 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002702 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002703 for bufsize in range(1, 10):
2704 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002705 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2706 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002707 encoding=encoding)
2708 if do_reads:
2709 got_lines = []
2710 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002711 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002712 if c2 == '':
2713 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002714 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002715 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002716 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002717 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002718
2719 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002720 self.assertEqual(got_line, exp_line)
2721 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002722
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002723 def test_newlines_input(self):
2724 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002725 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2726 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002727 (None, normalized.decode("ascii").splitlines(keepends=True)),
2728 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002729 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2730 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2731 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002732 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002733 buf = self.BytesIO(testdata)
2734 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002735 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002736 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002737 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002738
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002739 def test_newlines_output(self):
2740 testdict = {
2741 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2742 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2743 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2744 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2745 }
2746 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2747 for newline, expected in tests:
2748 buf = self.BytesIO()
2749 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2750 txt.write("AAA\nB")
2751 txt.write("BB\nCCC\n")
2752 txt.write("X\rY\r\nZ")
2753 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002754 self.assertEqual(buf.closed, False)
2755 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002756
2757 def test_destructor(self):
2758 l = []
2759 base = self.BytesIO
2760 class MyBytesIO(base):
2761 def close(self):
2762 l.append(self.getvalue())
2763 base.close(self)
2764 b = MyBytesIO()
2765 t = self.TextIOWrapper(b, encoding="ascii")
2766 t.write("abc")
2767 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002768 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002769 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002770
2771 def test_override_destructor(self):
2772 record = []
2773 class MyTextIO(self.TextIOWrapper):
2774 def __del__(self):
2775 record.append(1)
2776 try:
2777 f = super().__del__
2778 except AttributeError:
2779 pass
2780 else:
2781 f()
2782 def close(self):
2783 record.append(2)
2784 super().close()
2785 def flush(self):
2786 record.append(3)
2787 super().flush()
2788 b = self.BytesIO()
2789 t = MyTextIO(b, encoding="ascii")
2790 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002791 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002792 self.assertEqual(record, [1, 2, 3])
2793
2794 def test_error_through_destructor(self):
2795 # Test that the exception state is not modified by a destructor,
2796 # even if close() fails.
2797 rawio = self.CloseFailureIO()
2798 def f():
2799 self.TextIOWrapper(rawio).xyzzy
2800 with support.captured_output("stderr") as s:
2801 self.assertRaises(AttributeError, f)
2802 s = s.getvalue().strip()
2803 if s:
2804 # The destructor *may* have printed an unraisable error, check it
2805 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002806 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002807 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002808
Guido van Rossum9b76da62007-04-11 01:09:03 +00002809 # Systematic tests of the text I/O API
2810
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002811 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002812 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002813 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002814 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002815 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002816 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002817 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002818 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002819 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002820 self.assertEqual(f.tell(), 0)
2821 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002822 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002823 self.assertEqual(f.seek(0), 0)
2824 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002825 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002826 self.assertEqual(f.read(2), "ab")
2827 self.assertEqual(f.read(1), "c")
2828 self.assertEqual(f.read(1), "")
2829 self.assertEqual(f.read(), "")
2830 self.assertEqual(f.tell(), cookie)
2831 self.assertEqual(f.seek(0), 0)
2832 self.assertEqual(f.seek(0, 2), cookie)
2833 self.assertEqual(f.write("def"), 3)
2834 self.assertEqual(f.seek(cookie), cookie)
2835 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002836 if enc.startswith("utf"):
2837 self.multi_line_test(f, enc)
2838 f.close()
2839
2840 def multi_line_test(self, f, enc):
2841 f.seek(0)
2842 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002843 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002844 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002845 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002846 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002847 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002848 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002849 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002850 wlines.append((f.tell(), line))
2851 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002852 f.seek(0)
2853 rlines = []
2854 while True:
2855 pos = f.tell()
2856 line = f.readline()
2857 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002858 break
2859 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002860 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002861
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002862 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002863 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002864 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002865 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002866 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002867 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002868 p2 = f.tell()
2869 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002870 self.assertEqual(f.tell(), p0)
2871 self.assertEqual(f.readline(), "\xff\n")
2872 self.assertEqual(f.tell(), p1)
2873 self.assertEqual(f.readline(), "\xff\n")
2874 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002875 f.seek(0)
2876 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002877 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002878 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002879 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002880 f.close()
2881
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002882 def test_seeking(self):
2883 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002884 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002885 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002886 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002887 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002888 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002889 suffix = bytes(u_suffix.encode("utf-8"))
2890 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002891 with self.open(support.TESTFN, "wb") as f:
2892 f.write(line*2)
2893 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2894 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002895 self.assertEqual(s, str(prefix, "ascii"))
2896 self.assertEqual(f.tell(), prefix_size)
2897 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002898
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002899 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002900 # Regression test for a specific bug
2901 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002902 with self.open(support.TESTFN, "wb") as f:
2903 f.write(data)
2904 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2905 f._CHUNK_SIZE # Just test that it exists
2906 f._CHUNK_SIZE = 2
2907 f.readline()
2908 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002909
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002910 def test_seek_and_tell(self):
2911 #Test seek/tell using the StatefulIncrementalDecoder.
2912 # Make test faster by doing smaller seeks
2913 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002914
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002915 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002916 """Tell/seek to various points within a data stream and ensure
2917 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002918 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002919 f.write(data)
2920 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002921 f = self.open(support.TESTFN, encoding='test_decoder')
2922 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002923 decoded = f.read()
2924 f.close()
2925
Neal Norwitze2b07052008-03-18 19:52:05 +00002926 for i in range(min_pos, len(decoded) + 1): # seek positions
2927 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002928 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002929 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002930 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002931 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002932 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002933 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002934 f.close()
2935
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002936 # Enable the test decoder.
2937 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002938
2939 # Run the tests.
2940 try:
2941 # Try each test case.
2942 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002943 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002944
2945 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002946 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2947 offset = CHUNK_SIZE - len(input)//2
2948 prefix = b'.'*offset
2949 # Don't bother seeking into the prefix (takes too long).
2950 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002951 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002952
2953 # Ensure our test decoder won't interfere with subsequent tests.
2954 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002955 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002956
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002957 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002958 data = "1234567890"
2959 tests = ("utf-16",
2960 "utf-16-le",
2961 "utf-16-be",
2962 "utf-32",
2963 "utf-32-le",
2964 "utf-32-be")
2965 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002966 buf = self.BytesIO()
2967 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002968 # Check if the BOM is written only once (see issue1753).
2969 f.write(data)
2970 f.write(data)
2971 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002972 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002973 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002974 self.assertEqual(f.read(), data * 2)
2975 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002976
Benjamin Petersona1b49012009-03-31 23:11:32 +00002977 def test_unreadable(self):
2978 class UnReadable(self.BytesIO):
2979 def readable(self):
2980 return False
2981 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002982 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002983
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002984 def test_read_one_by_one(self):
2985 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002986 reads = ""
2987 while True:
2988 c = txt.read(1)
2989 if not c:
2990 break
2991 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002992 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002993
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002994 def test_readlines(self):
2995 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2996 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2997 txt.seek(0)
2998 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2999 txt.seek(0)
3000 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
3001
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003002 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003003 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003004 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003005 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003006 reads = ""
3007 while True:
3008 c = txt.read(128)
3009 if not c:
3010 break
3011 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003012 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003013
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02003014 def test_writelines(self):
3015 l = ['ab', 'cd', 'ef']
3016 buf = self.BytesIO()
3017 txt = self.TextIOWrapper(buf)
3018 txt.writelines(l)
3019 txt.flush()
3020 self.assertEqual(buf.getvalue(), b'abcdef')
3021
3022 def test_writelines_userlist(self):
3023 l = UserList(['ab', 'cd', 'ef'])
3024 buf = self.BytesIO()
3025 txt = self.TextIOWrapper(buf)
3026 txt.writelines(l)
3027 txt.flush()
3028 self.assertEqual(buf.getvalue(), b'abcdef')
3029
3030 def test_writelines_error(self):
3031 txt = self.TextIOWrapper(self.BytesIO())
3032 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3033 self.assertRaises(TypeError, txt.writelines, None)
3034 self.assertRaises(TypeError, txt.writelines, b'abc')
3035
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003036 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003037 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003038
3039 # read one char at a time
3040 reads = ""
3041 while True:
3042 c = txt.read(1)
3043 if not c:
3044 break
3045 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003046 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003047
3048 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003049 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003050 txt._CHUNK_SIZE = 4
3051
3052 reads = ""
3053 while True:
3054 c = txt.read(4)
3055 if not c:
3056 break
3057 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003058 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003059
3060 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003061 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003062 txt._CHUNK_SIZE = 4
3063
3064 reads = txt.read(4)
3065 reads += txt.read(4)
3066 reads += txt.readline()
3067 reads += txt.readline()
3068 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003069 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003070
3071 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003072 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003073 txt._CHUNK_SIZE = 4
3074
3075 reads = txt.read(4)
3076 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003077 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003078
3079 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003080 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003081 txt._CHUNK_SIZE = 4
3082
3083 reads = txt.read(4)
3084 pos = txt.tell()
3085 txt.seek(0)
3086 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003087 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003088
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003089 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003090 buffer = self.BytesIO(self.testdata)
3091 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003092
3093 self.assertEqual(buffer.seekable(), txt.seekable())
3094
Antoine Pitroue4501852009-05-14 18:55:55 +00003095 def test_append_bom(self):
3096 # The BOM is not written again when appending to a non-empty file
3097 filename = support.TESTFN
3098 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3099 with self.open(filename, 'w', encoding=charset) as f:
3100 f.write('aaa')
3101 pos = f.tell()
3102 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003103 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003104
3105 with self.open(filename, 'a', encoding=charset) as f:
3106 f.write('xxx')
3107 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003108 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003109
3110 def test_seek_bom(self):
3111 # Same test, but when seeking manually
3112 filename = support.TESTFN
3113 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3114 with self.open(filename, 'w', encoding=charset) as f:
3115 f.write('aaa')
3116 pos = f.tell()
3117 with self.open(filename, 'r+', encoding=charset) as f:
3118 f.seek(pos)
3119 f.write('zzz')
3120 f.seek(0)
3121 f.write('bbb')
3122 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003123 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003124
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003125 def test_seek_append_bom(self):
3126 # Same test, but first seek to the start and then to the end
3127 filename = support.TESTFN
3128 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3129 with self.open(filename, 'w', encoding=charset) as f:
3130 f.write('aaa')
3131 with self.open(filename, 'a', encoding=charset) as f:
3132 f.seek(0)
3133 f.seek(0, self.SEEK_END)
3134 f.write('xxx')
3135 with self.open(filename, 'rb') as f:
3136 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3137
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003138 def test_errors_property(self):
3139 with self.open(support.TESTFN, "w") as f:
3140 self.assertEqual(f.errors, "strict")
3141 with self.open(support.TESTFN, "w", errors="replace") as f:
3142 self.assertEqual(f.errors, "replace")
3143
Brett Cannon31f59292011-02-21 19:29:56 +00003144 @support.no_tracing
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003145 def test_threads_write(self):
3146 # Issue6750: concurrent writes could duplicate data
3147 event = threading.Event()
3148 with self.open(support.TESTFN, "w", buffering=1) as f:
3149 def run(n):
3150 text = "Thread%03d\n" % n
3151 event.wait()
3152 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003153 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003154 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003155 with support.start_threads(threads, event.set):
3156 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003157 with self.open(support.TESTFN) as f:
3158 content = f.read()
3159 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003160 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003161
Antoine Pitrou6be88762010-05-03 16:48:20 +00003162 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003163 # Test that text file is closed despite failed flush
3164 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003165 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003166 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003167 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003168 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003169 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003170 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003171 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003172 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003173 self.assertTrue(txt.buffer.closed)
3174 self.assertTrue(closed) # flush() called
3175 self.assertFalse(closed[0]) # flush() called before file closed
3176 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003177 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003178
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003179 def test_close_error_on_close(self):
3180 buffer = self.BytesIO(self.testdata)
3181 def bad_flush():
3182 raise OSError('flush')
3183 def bad_close():
3184 raise OSError('close')
3185 buffer.close = bad_close
3186 txt = self.TextIOWrapper(buffer, encoding="ascii")
3187 txt.flush = bad_flush
3188 with self.assertRaises(OSError) as err: # exception not swallowed
3189 txt.close()
3190 self.assertEqual(err.exception.args, ('close',))
3191 self.assertIsInstance(err.exception.__context__, OSError)
3192 self.assertEqual(err.exception.__context__.args, ('flush',))
3193 self.assertFalse(txt.closed)
3194
3195 def test_nonnormalized_close_error_on_close(self):
3196 # Issue #21677
3197 buffer = self.BytesIO(self.testdata)
3198 def bad_flush():
3199 raise non_existing_flush
3200 def bad_close():
3201 raise non_existing_close
3202 buffer.close = bad_close
3203 txt = self.TextIOWrapper(buffer, encoding="ascii")
3204 txt.flush = bad_flush
3205 with self.assertRaises(NameError) as err: # exception not swallowed
3206 txt.close()
3207 self.assertIn('non_existing_close', str(err.exception))
3208 self.assertIsInstance(err.exception.__context__, NameError)
3209 self.assertIn('non_existing_flush', str(err.exception.__context__))
3210 self.assertFalse(txt.closed)
3211
Antoine Pitrou6be88762010-05-03 16:48:20 +00003212 def test_multi_close(self):
3213 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3214 txt.close()
3215 txt.close()
3216 txt.close()
3217 self.assertRaises(ValueError, txt.flush)
3218
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003219 def test_unseekable(self):
3220 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3221 self.assertRaises(self.UnsupportedOperation, txt.tell)
3222 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3223
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003224 def test_readonly_attributes(self):
3225 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3226 buf = self.BytesIO(self.testdata)
3227 with self.assertRaises(AttributeError):
3228 txt.buffer = buf
3229
Antoine Pitroue96ec682011-07-23 21:46:35 +02003230 def test_rawio(self):
3231 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3232 # that subprocess.Popen() can have the required unbuffered
3233 # semantics with universal_newlines=True.
3234 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3235 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3236 # Reads
3237 self.assertEqual(txt.read(4), 'abcd')
3238 self.assertEqual(txt.readline(), 'efghi\n')
3239 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3240
3241 def test_rawio_write_through(self):
3242 # Issue #12591: with write_through=True, writes don't need a flush
3243 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3244 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3245 write_through=True)
3246 txt.write('1')
3247 txt.write('23\n4')
3248 txt.write('5')
3249 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3250
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003251 def test_bufio_write_through(self):
3252 # Issue #21396: write_through=True doesn't force a flush()
3253 # on the underlying binary buffered object.
3254 flush_called, write_called = [], []
3255 class BufferedWriter(self.BufferedWriter):
3256 def flush(self, *args, **kwargs):
3257 flush_called.append(True)
3258 return super().flush(*args, **kwargs)
3259 def write(self, *args, **kwargs):
3260 write_called.append(True)
3261 return super().write(*args, **kwargs)
3262
3263 rawio = self.BytesIO()
3264 data = b"a"
3265 bufio = BufferedWriter(rawio, len(data)*2)
3266 textio = self.TextIOWrapper(bufio, encoding='ascii',
3267 write_through=True)
3268 # write to the buffered io but don't overflow the buffer
3269 text = data.decode('ascii')
3270 textio.write(text)
3271
3272 # buffer.flush is not called with write_through=True
3273 self.assertFalse(flush_called)
3274 # buffer.write *is* called with write_through=True
3275 self.assertTrue(write_called)
3276 self.assertEqual(rawio.getvalue(), b"") # no flush
3277
3278 write_called = [] # reset
3279 textio.write(text * 10) # total content is larger than bufio buffer
3280 self.assertTrue(write_called)
3281 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3282
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02003283 def test_reconfigure_write_through(self):
3284 raw = self.MockRawIO([])
3285 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3286 t.write('1')
3287 t.reconfigure(write_through=True) # implied flush
3288 self.assertEqual(t.write_through, True)
3289 self.assertEqual(b''.join(raw._write_stack), b'1')
3290 t.write('23')
3291 self.assertEqual(b''.join(raw._write_stack), b'123')
3292 t.reconfigure(write_through=False)
3293 self.assertEqual(t.write_through, False)
3294 t.write('45')
3295 t.flush()
3296 self.assertEqual(b''.join(raw._write_stack), b'12345')
3297 # Keeping default value
3298 t.reconfigure()
3299 t.reconfigure(write_through=None)
3300 self.assertEqual(t.write_through, False)
3301 t.reconfigure(write_through=True)
3302 t.reconfigure()
3303 t.reconfigure(write_through=None)
3304 self.assertEqual(t.write_through, True)
3305
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003306 def test_read_nonbytes(self):
3307 # Issue #17106
3308 # Crash when underlying read() returns non-bytes
3309 t = self.TextIOWrapper(self.StringIO('a'))
3310 self.assertRaises(TypeError, t.read, 1)
3311 t = self.TextIOWrapper(self.StringIO('a'))
3312 self.assertRaises(TypeError, t.readline)
3313 t = self.TextIOWrapper(self.StringIO('a'))
3314 self.assertRaises(TypeError, t.read)
3315
Oren Milmana5b4ea12017-08-25 21:14:54 +03003316 def test_illegal_encoder(self):
3317 # Issue 31271: Calling write() while the return value of encoder's
3318 # encode() is invalid shouldn't cause an assertion failure.
3319 rot13 = codecs.lookup("rot13")
3320 with support.swap_attr(rot13, '_is_text_encoding', True):
3321 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3322 self.assertRaises(TypeError, t.write, 'bar')
3323
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003324 def test_illegal_decoder(self):
3325 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003326 # Bypass the early encoding check added in issue 20404
3327 def _make_illegal_wrapper():
3328 quopri = codecs.lookup("quopri")
3329 quopri._is_text_encoding = True
3330 try:
3331 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3332 newline='\n', encoding="quopri")
3333 finally:
3334 quopri._is_text_encoding = False
3335 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003336 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003337 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003338 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003339 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003340 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003341 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003342 self.assertRaises(TypeError, t.read)
3343
Oren Milmanba7d7362017-08-29 11:58:27 +03003344 # Issue 31243: calling read() while the return value of decoder's
3345 # getstate() is invalid should neither crash the interpreter nor
3346 # raise a SystemError.
3347 def _make_very_illegal_wrapper(getstate_ret_val):
3348 class BadDecoder:
3349 def getstate(self):
3350 return getstate_ret_val
3351 def _get_bad_decoder(dummy):
3352 return BadDecoder()
3353 quopri = codecs.lookup("quopri")
3354 with support.swap_attr(quopri, 'incrementaldecoder',
3355 _get_bad_decoder):
3356 return _make_illegal_wrapper()
3357 t = _make_very_illegal_wrapper(42)
3358 self.assertRaises(TypeError, t.read, 42)
3359 t = _make_very_illegal_wrapper(())
3360 self.assertRaises(TypeError, t.read, 42)
3361 t = _make_very_illegal_wrapper((1, 2))
3362 self.assertRaises(TypeError, t.read, 42)
3363
Antoine Pitrou712cb732013-12-21 15:51:54 +01003364 def _check_create_at_shutdown(self, **kwargs):
3365 # Issue #20037: creating a TextIOWrapper at shutdown
3366 # shouldn't crash the interpreter.
3367 iomod = self.io.__name__
3368 code = """if 1:
3369 import codecs
3370 import {iomod} as io
3371
3372 # Avoid looking up codecs at shutdown
3373 codecs.lookup('utf-8')
3374
3375 class C:
3376 def __init__(self):
3377 self.buf = io.BytesIO()
3378 def __del__(self):
3379 io.TextIOWrapper(self.buf, **{kwargs})
3380 print("ok")
3381 c = C()
3382 """.format(iomod=iomod, kwargs=kwargs)
3383 return assert_python_ok("-c", code)
3384
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003385 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003386 def test_create_at_shutdown_without_encoding(self):
3387 rc, out, err = self._check_create_at_shutdown()
3388 if err:
3389 # Can error out with a RuntimeError if the module state
3390 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003391 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003392 else:
3393 self.assertEqual("ok", out.decode().strip())
3394
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003395 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003396 def test_create_at_shutdown_with_encoding(self):
3397 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3398 errors='strict')
3399 self.assertFalse(err)
3400 self.assertEqual("ok", out.decode().strip())
3401
Antoine Pitroub8503892014-04-29 10:14:02 +02003402 def test_read_byteslike(self):
3403 r = MemviewBytesIO(b'Just some random string\n')
3404 t = self.TextIOWrapper(r, 'utf-8')
3405
3406 # TextIOwrapper will not read the full string, because
3407 # we truncate it to a multiple of the native int size
3408 # so that we can construct a more complex memoryview.
3409 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3410
3411 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3412
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003413 def test_issue22849(self):
3414 class F(object):
3415 def readable(self): return True
3416 def writable(self): return True
3417 def seekable(self): return True
3418
3419 for i in range(10):
3420 try:
3421 self.TextIOWrapper(F(), encoding='utf-8')
3422 except Exception:
3423 pass
3424
3425 F.tell = lambda x: 0
3426 t = self.TextIOWrapper(F(), encoding='utf-8')
3427
INADA Naoki507434f2017-12-21 09:59:53 +09003428 def test_reconfigure_encoding_read(self):
3429 # latin1 -> utf8
3430 # (latin1 can decode utf-8 encoded string)
3431 data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
3432 raw = self.BytesIO(data)
3433 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3434 self.assertEqual(txt.readline(), 'abc\xe9\n')
3435 with self.assertRaises(self.UnsupportedOperation):
3436 txt.reconfigure(encoding='utf-8')
3437 with self.assertRaises(self.UnsupportedOperation):
3438 txt.reconfigure(newline=None)
3439
3440 def test_reconfigure_write_fromascii(self):
3441 # ascii has a specific encodefunc in the C implementation,
3442 # but utf-8-sig has not. Make sure that we get rid of the
3443 # cached encodefunc when we switch encoders.
3444 raw = self.BytesIO()
3445 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3446 txt.write('foo\n')
3447 txt.reconfigure(encoding='utf-8-sig')
3448 txt.write('\xe9\n')
3449 txt.flush()
3450 self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
3451
3452 def test_reconfigure_write(self):
3453 # latin -> utf8
3454 raw = self.BytesIO()
3455 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3456 txt.write('abc\xe9\n')
3457 txt.reconfigure(encoding='utf-8')
3458 self.assertEqual(raw.getvalue(), b'abc\xe9\n')
3459 txt.write('d\xe9f\n')
3460 txt.flush()
3461 self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
3462
3463 # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
3464 # the file
3465 raw = self.BytesIO()
3466 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3467 txt.write('abc\n')
3468 txt.reconfigure(encoding='utf-8-sig')
3469 txt.write('d\xe9f\n')
3470 txt.flush()
3471 self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
3472
3473 def test_reconfigure_write_non_seekable(self):
3474 raw = self.BytesIO()
3475 raw.seekable = lambda: False
3476 raw.seek = None
3477 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3478 txt.write('abc\n')
3479 txt.reconfigure(encoding='utf-8-sig')
3480 txt.write('d\xe9f\n')
3481 txt.flush()
3482
3483 # If the raw stream is not seekable, there'll be a BOM
3484 self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
3485
3486 def test_reconfigure_defaults(self):
3487 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
3488 txt.reconfigure(encoding=None)
3489 self.assertEqual(txt.encoding, 'ascii')
3490 self.assertEqual(txt.errors, 'replace')
3491 txt.write('LF\n')
3492
3493 txt.reconfigure(newline='\r\n')
3494 self.assertEqual(txt.encoding, 'ascii')
3495 self.assertEqual(txt.errors, 'replace')
3496
3497 txt.reconfigure(errors='ignore')
3498 self.assertEqual(txt.encoding, 'ascii')
3499 self.assertEqual(txt.errors, 'ignore')
3500 txt.write('CRLF\n')
3501
3502 txt.reconfigure(encoding='utf-8', newline=None)
3503 self.assertEqual(txt.errors, 'strict')
3504 txt.seek(0)
3505 self.assertEqual(txt.read(), 'LF\nCRLF\n')
3506
3507 self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
3508
3509 def test_reconfigure_newline(self):
3510 raw = self.BytesIO(b'CR\rEOF')
3511 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3512 txt.reconfigure(newline=None)
3513 self.assertEqual(txt.readline(), 'CR\n')
3514 raw = self.BytesIO(b'CR\rEOF')
3515 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3516 txt.reconfigure(newline='')
3517 self.assertEqual(txt.readline(), 'CR\r')
3518 raw = self.BytesIO(b'CR\rLF\nEOF')
3519 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3520 txt.reconfigure(newline='\n')
3521 self.assertEqual(txt.readline(), 'CR\rLF\n')
3522 raw = self.BytesIO(b'LF\nCR\rEOF')
3523 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3524 txt.reconfigure(newline='\r')
3525 self.assertEqual(txt.readline(), 'LF\nCR\r')
3526 raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
3527 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3528 txt.reconfigure(newline='\r\n')
3529 self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
3530
3531 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
3532 txt.reconfigure(newline=None)
3533 txt.write('linesep\n')
3534 txt.reconfigure(newline='')
3535 txt.write('LF\n')
3536 txt.reconfigure(newline='\n')
3537 txt.write('LF\n')
3538 txt.reconfigure(newline='\r')
3539 txt.write('CR\n')
3540 txt.reconfigure(newline='\r\n')
3541 txt.write('CRLF\n')
3542 expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
3543 self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
3544
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003545
Antoine Pitroub8503892014-04-29 10:14:02 +02003546class MemviewBytesIO(io.BytesIO):
3547 '''A BytesIO object whose read method returns memoryviews
3548 rather than bytes'''
3549
3550 def read1(self, len_):
3551 return _to_memoryview(super().read1(len_))
3552
3553 def read(self, len_):
3554 return _to_memoryview(super().read(len_))
3555
3556def _to_memoryview(buf):
3557 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3558
3559 arr = array.array('i')
3560 idx = len(buf) - len(buf) % arr.itemsize
3561 arr.frombytes(buf[:idx])
3562 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003563
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003564
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003565class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003566 io = io
Steve Dowerfe8f4c92016-09-07 09:31:52 -07003567 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003568
3569 def test_initialization(self):
3570 r = self.BytesIO(b"\xc3\xa9\n\n")
3571 b = self.BufferedReader(r, 1000)
3572 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003573 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3574 self.assertRaises(ValueError, t.read)
3575
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003576 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3577 self.assertRaises(Exception, repr, t)
3578
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003579 def test_garbage_collection(self):
3580 # C TextIOWrapper objects are collected, and collecting them flushes
3581 # all data to disk.
3582 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003583 with support.check_warnings(('', ResourceWarning)):
3584 rawio = io.FileIO(support.TESTFN, "wb")
3585 b = self.BufferedWriter(rawio)
3586 t = self.TextIOWrapper(b, encoding="ascii")
3587 t.write("456def")
3588 t.x = t
3589 wr = weakref.ref(t)
3590 del t
3591 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003592 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003593 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003594 self.assertEqual(f.read(), b"456def")
3595
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003596 def test_rwpair_cleared_before_textio(self):
3597 # Issue 13070: TextIOWrapper's finalization would crash when called
3598 # after the reference to the underlying BufferedRWPair's writer got
3599 # cleared by the GC.
3600 for i in range(1000):
3601 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3602 t1 = self.TextIOWrapper(b1, encoding="ascii")
3603 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3604 t2 = self.TextIOWrapper(b2, encoding="ascii")
3605 # circular references
3606 t1.buddy = t2
3607 t2.buddy = t1
3608 support.gc_collect()
3609
3610
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003611class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003612 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003613 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003614
3615
3616class IncrementalNewlineDecoderTest(unittest.TestCase):
3617
3618 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003619 # UTF-8 specific tests for a newline decoder
3620 def _check_decode(b, s, **kwargs):
3621 # We exercise getstate() / setstate() as well as decode()
3622 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003623 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003624 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003625 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003626
Antoine Pitrou180a3362008-12-14 16:36:46 +00003627 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003628
Antoine Pitrou180a3362008-12-14 16:36:46 +00003629 _check_decode(b'\xe8', "")
3630 _check_decode(b'\xa2', "")
3631 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003632
Antoine Pitrou180a3362008-12-14 16:36:46 +00003633 _check_decode(b'\xe8', "")
3634 _check_decode(b'\xa2', "")
3635 _check_decode(b'\x88', "\u8888")
3636
3637 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003638 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3639
Antoine Pitrou180a3362008-12-14 16:36:46 +00003640 decoder.reset()
3641 _check_decode(b'\n', "\n")
3642 _check_decode(b'\r', "")
3643 _check_decode(b'', "\n", final=True)
3644 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003645
Antoine Pitrou180a3362008-12-14 16:36:46 +00003646 _check_decode(b'\r', "")
3647 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003648
Antoine Pitrou180a3362008-12-14 16:36:46 +00003649 _check_decode(b'\r\r\n', "\n\n")
3650 _check_decode(b'\r', "")
3651 _check_decode(b'\r', "\n")
3652 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003653
Antoine Pitrou180a3362008-12-14 16:36:46 +00003654 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3655 _check_decode(b'\xe8\xa2\x88', "\u8888")
3656 _check_decode(b'\n', "\n")
3657 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3658 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003659
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003660 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003661 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003662 if encoding is not None:
3663 encoder = codecs.getincrementalencoder(encoding)()
3664 def _decode_bytewise(s):
3665 # Decode one byte at a time
3666 for b in encoder.encode(s):
3667 result.append(decoder.decode(bytes([b])))
3668 else:
3669 encoder = None
3670 def _decode_bytewise(s):
3671 # Decode one char at a time
3672 for c in s:
3673 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003674 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003675 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003676 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003677 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003678 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003679 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003680 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003681 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003682 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003683 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003684 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003685 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003686 input = "abc"
3687 if encoder is not None:
3688 encoder.reset()
3689 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003690 self.assertEqual(decoder.decode(input), "abc")
3691 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003692
3693 def test_newline_decoder(self):
3694 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003695 # None meaning the IncrementalNewlineDecoder takes unicode input
3696 # rather than bytes input
3697 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003698 'utf-16', 'utf-16-le', 'utf-16-be',
3699 'utf-32', 'utf-32-le', 'utf-32-be',
3700 )
3701 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003702 decoder = enc and codecs.getincrementaldecoder(enc)()
3703 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3704 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003705 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003706 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3707 self.check_newline_decoding_utf8(decoder)
Oren Milman1d1d3e92017-08-20 18:35:36 +03003708 self.assertRaises(TypeError, decoder.setstate, 42)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003709
Antoine Pitrou66913e22009-03-06 23:40:56 +00003710 def test_newline_bytes(self):
3711 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3712 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003713 self.assertEqual(dec.newlines, None)
3714 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3715 self.assertEqual(dec.newlines, None)
3716 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3717 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003718 dec = self.IncrementalNewlineDecoder(None, translate=False)
3719 _check(dec)
3720 dec = self.IncrementalNewlineDecoder(None, translate=True)
3721 _check(dec)
3722
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003723class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3724 pass
3725
3726class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3727 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003728
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003729
Guido van Rossum01a27522007-03-07 01:00:12 +00003730# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003731
Guido van Rossum5abbf752007-08-27 17:39:33 +00003732class MiscIOTest(unittest.TestCase):
3733
Barry Warsaw40e82462008-11-20 20:14:50 +00003734 def tearDown(self):
3735 support.unlink(support.TESTFN)
3736
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003737 def test___all__(self):
3738 for name in self.io.__all__:
3739 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003740 self.assertIsNotNone(obj, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003741 if name == "open":
3742 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003743 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003744 self.assertTrue(issubclass(obj, Exception), name)
3745 elif not name.startswith("SEEK_"):
3746 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003747
Barry Warsaw40e82462008-11-20 20:14:50 +00003748 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003749 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003750 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003751 f.close()
3752
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003753 with support.check_warnings(('', DeprecationWarning)):
3754 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003755 self.assertEqual(f.name, support.TESTFN)
3756 self.assertEqual(f.buffer.name, support.TESTFN)
3757 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3758 self.assertEqual(f.mode, "U")
3759 self.assertEqual(f.buffer.mode, "rb")
3760 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003761 f.close()
3762
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003763 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003764 self.assertEqual(f.mode, "w+")
3765 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3766 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003767
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003768 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003769 self.assertEqual(g.mode, "wb")
3770 self.assertEqual(g.raw.mode, "wb")
3771 self.assertEqual(g.name, f.fileno())
3772 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003773 f.close()
3774 g.close()
3775
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003776 def test_io_after_close(self):
3777 for kwargs in [
3778 {"mode": "w"},
3779 {"mode": "wb"},
3780 {"mode": "w", "buffering": 1},
3781 {"mode": "w", "buffering": 2},
3782 {"mode": "wb", "buffering": 0},
3783 {"mode": "r"},
3784 {"mode": "rb"},
3785 {"mode": "r", "buffering": 1},
3786 {"mode": "r", "buffering": 2},
3787 {"mode": "rb", "buffering": 0},
3788 {"mode": "w+"},
3789 {"mode": "w+b"},
3790 {"mode": "w+", "buffering": 1},
3791 {"mode": "w+", "buffering": 2},
3792 {"mode": "w+b", "buffering": 0},
3793 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003794 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003795 f.close()
3796 self.assertRaises(ValueError, f.flush)
3797 self.assertRaises(ValueError, f.fileno)
3798 self.assertRaises(ValueError, f.isatty)
3799 self.assertRaises(ValueError, f.__iter__)
3800 if hasattr(f, "peek"):
3801 self.assertRaises(ValueError, f.peek, 1)
3802 self.assertRaises(ValueError, f.read)
3803 if hasattr(f, "read1"):
3804 self.assertRaises(ValueError, f.read1, 1024)
Martin Panterccb2c0e2016-10-20 23:48:14 +00003805 self.assertRaises(ValueError, f.read1)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003806 if hasattr(f, "readall"):
3807 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003808 if hasattr(f, "readinto"):
3809 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003810 if hasattr(f, "readinto1"):
3811 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003812 self.assertRaises(ValueError, f.readline)
3813 self.assertRaises(ValueError, f.readlines)
Xiang Zhang026435c2017-04-15 12:47:28 +08003814 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003815 self.assertRaises(ValueError, f.seek, 0)
3816 self.assertRaises(ValueError, f.tell)
3817 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003818 self.assertRaises(ValueError, f.write,
3819 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003820 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003821 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003822
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003823 def test_blockingioerror(self):
3824 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003825 class C(str):
3826 pass
3827 c = C("")
3828 b = self.BlockingIOError(1, c)
3829 c.b = b
3830 b.c = c
3831 wr = weakref.ref(c)
3832 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003833 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003834 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003835
3836 def test_abcs(self):
3837 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003838 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3839 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3840 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3841 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003842
3843 def _check_abc_inheritance(self, abcmodule):
3844 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003845 self.assertIsInstance(f, abcmodule.IOBase)
3846 self.assertIsInstance(f, abcmodule.RawIOBase)
3847 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3848 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003849 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003850 self.assertIsInstance(f, abcmodule.IOBase)
3851 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3852 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3853 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003854 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003855 self.assertIsInstance(f, abcmodule.IOBase)
3856 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3857 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3858 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003859
3860 def test_abc_inheritance(self):
3861 # Test implementations inherit from their respective ABCs
3862 self._check_abc_inheritance(self)
3863
3864 def test_abc_inheritance_official(self):
3865 # Test implementations inherit from the official ABCs of the
3866 # baseline "io" module.
3867 self._check_abc_inheritance(io)
3868
Antoine Pitroue033e062010-10-29 10:38:18 +00003869 def _check_warn_on_dealloc(self, *args, **kwargs):
3870 f = open(*args, **kwargs)
3871 r = repr(f)
3872 with self.assertWarns(ResourceWarning) as cm:
3873 f = None
3874 support.gc_collect()
3875 self.assertIn(r, str(cm.warning.args[0]))
3876
3877 def test_warn_on_dealloc(self):
3878 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3879 self._check_warn_on_dealloc(support.TESTFN, "wb")
3880 self._check_warn_on_dealloc(support.TESTFN, "w")
3881
3882 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3883 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003884 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003885 for fd in fds:
3886 try:
3887 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003888 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003889 if e.errno != errno.EBADF:
3890 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003891 self.addCleanup(cleanup_fds)
3892 r, w = os.pipe()
3893 fds += r, w
3894 self._check_warn_on_dealloc(r, *args, **kwargs)
3895 # When using closefd=False, there's no warning
3896 r, w = os.pipe()
3897 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02003898 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00003899 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00003900
3901 def test_warn_on_dealloc_fd(self):
3902 self._check_warn_on_dealloc_fd("rb", buffering=0)
3903 self._check_warn_on_dealloc_fd("rb")
3904 self._check_warn_on_dealloc_fd("r")
3905
3906
Antoine Pitrou243757e2010-11-05 21:15:39 +00003907 def test_pickling(self):
3908 # Pickling file objects is forbidden
3909 for kwargs in [
3910 {"mode": "w"},
3911 {"mode": "wb"},
3912 {"mode": "wb", "buffering": 0},
3913 {"mode": "r"},
3914 {"mode": "rb"},
3915 {"mode": "rb", "buffering": 0},
3916 {"mode": "w+"},
3917 {"mode": "w+b"},
3918 {"mode": "w+b", "buffering": 0},
3919 ]:
3920 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3921 with self.open(support.TESTFN, **kwargs) as f:
3922 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3923
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003924 def test_nonblock_pipe_write_bigbuf(self):
3925 self._test_nonblock_pipe_write(16*1024)
3926
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003927 def test_nonblock_pipe_write_smallbuf(self):
3928 self._test_nonblock_pipe_write(1024)
3929
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003930 @unittest.skipUnless(hasattr(os, 'set_blocking'),
3931 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003932 def _test_nonblock_pipe_write(self, bufsize):
3933 sent = []
3934 received = []
3935 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003936 os.set_blocking(r, False)
3937 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003938
3939 # To exercise all code paths in the C implementation we need
3940 # to play with buffer sizes. For instance, if we choose a
3941 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3942 # then we will never get a partial write of the buffer.
3943 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3944 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3945
3946 with rf, wf:
3947 for N in 9999, 73, 7574:
3948 try:
3949 i = 0
3950 while True:
3951 msg = bytes([i % 26 + 97]) * N
3952 sent.append(msg)
3953 wf.write(msg)
3954 i += 1
3955
3956 except self.BlockingIOError as e:
3957 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003958 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003959 sent[-1] = sent[-1][:e.characters_written]
3960 received.append(rf.read())
3961 msg = b'BLOCKED'
3962 wf.write(msg)
3963 sent.append(msg)
3964
3965 while True:
3966 try:
3967 wf.flush()
3968 break
3969 except self.BlockingIOError as e:
3970 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003971 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003972 self.assertEqual(e.characters_written, 0)
3973 received.append(rf.read())
3974
3975 received += iter(rf.read, None)
3976
3977 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003978 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003979 self.assertTrue(wf.closed)
3980 self.assertTrue(rf.closed)
3981
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003982 def test_create_fail(self):
3983 # 'x' mode fails if file is existing
3984 with self.open(support.TESTFN, 'w'):
3985 pass
3986 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3987
3988 def test_create_writes(self):
3989 # 'x' mode opens for writing
3990 with self.open(support.TESTFN, 'xb') as f:
3991 f.write(b"spam")
3992 with self.open(support.TESTFN, 'rb') as f:
3993 self.assertEqual(b"spam", f.read())
3994
Christian Heimes7b648752012-09-10 14:48:43 +02003995 def test_open_allargs(self):
3996 # there used to be a buffer overflow in the parser for rawmode
3997 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3998
3999
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004000class CMiscIOTest(MiscIOTest):
4001 io = io
4002
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03004003 def test_readinto_buffer_overflow(self):
4004 # Issue #18025
4005 class BadReader(self.io.BufferedIOBase):
4006 def read(self, n=-1):
4007 return b'x' * 10**6
4008 bufio = BadReader()
4009 b = bytearray(2)
4010 self.assertRaises(ValueError, bufio.readinto, b)
4011
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004012 def check_daemon_threads_shutdown_deadlock(self, stream_name):
4013 # Issue #23309: deadlocks at shutdown should be avoided when a
4014 # daemon thread and the main thread both write to a file.
4015 code = """if 1:
4016 import sys
4017 import time
4018 import threading
Victor Stinner2a1aed02017-04-21 17:59:23 +02004019 from test.support import SuppressCrashReport
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004020
4021 file = sys.{stream_name}
4022
4023 def run():
4024 while True:
4025 file.write('.')
4026 file.flush()
4027
Victor Stinner2a1aed02017-04-21 17:59:23 +02004028 crash = SuppressCrashReport()
4029 crash.__enter__()
4030 # don't call __exit__(): the crash occurs at Python shutdown
4031
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004032 thread = threading.Thread(target=run)
4033 thread.daemon = True
4034 thread.start()
4035
4036 time.sleep(0.5)
4037 file.write('!')
4038 file.flush()
4039 """.format_map(locals())
4040 res, _ = run_python_until_end("-c", code)
4041 err = res.err.decode()
4042 if res.rc != 0:
4043 # Failure: should be a fatal error
4044 self.assertIn("Fatal Python error: could not acquire lock "
4045 "for <_io.BufferedWriter name='<{stream_name}>'> "
4046 "at interpreter shutdown, possibly due to "
4047 "daemon threads".format_map(locals()),
4048 err)
4049 else:
4050 self.assertFalse(err.strip('.!'))
4051
4052 def test_daemon_threads_shutdown_stdout_deadlock(self):
4053 self.check_daemon_threads_shutdown_deadlock('stdout')
4054
4055 def test_daemon_threads_shutdown_stderr_deadlock(self):
4056 self.check_daemon_threads_shutdown_deadlock('stderr')
4057
4058
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004059class PyMiscIOTest(MiscIOTest):
4060 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00004061
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004062
4063@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
4064class SignalsTest(unittest.TestCase):
4065
4066 def setUp(self):
4067 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
4068
4069 def tearDown(self):
4070 signal.signal(signal.SIGALRM, self.oldalrm)
4071
4072 def alarm_interrupt(self, sig, frame):
4073 1/0
4074
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004075 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
4076 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00004077 invokes the signal handler, and bubbles up the exception raised
4078 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004079 read_results = []
4080 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02004081 if hasattr(signal, 'pthread_sigmask'):
4082 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004083 s = os.read(r, 1)
4084 read_results.append(s)
4085 t = threading.Thread(target=_read)
4086 t.daemon = True
4087 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00004088 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01004089 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004090 try:
4091 wio = self.io.open(w, **fdopen_kwargs)
4092 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004093 # Fill the pipe enough that the write will be blocking.
4094 # It will be interrupted by the timer armed above. Since the
4095 # other thread has read one byte, the low-level write will
4096 # return with a successful (partial) result rather than an EINTR.
4097 # The buffered IO layer must check for pending signal
4098 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02004099 signal.alarm(1)
4100 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01004101 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02004102 finally:
4103 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03004104 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004105 # We got one byte, get another one and check that it isn't a
4106 # repeat of the first one.
4107 read_results.append(os.read(r, 1))
4108 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
4109 finally:
4110 os.close(w)
4111 os.close(r)
4112 # This is deliberate. If we didn't close the file descriptor
4113 # before closing wio, wio would try to flush its internal
4114 # buffer, and block again.
4115 try:
4116 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004117 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004118 if e.errno != errno.EBADF:
4119 raise
4120
4121 def test_interrupted_write_unbuffered(self):
4122 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
4123
4124 def test_interrupted_write_buffered(self):
4125 self.check_interrupted_write(b"xy", b"xy", mode="wb")
4126
4127 def test_interrupted_write_text(self):
4128 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
4129
Brett Cannon31f59292011-02-21 19:29:56 +00004130 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004131 def check_reentrant_write(self, data, **fdopen_kwargs):
4132 def on_alarm(*args):
4133 # Will be called reentrantly from the same thread
4134 wio.write(data)
4135 1/0
4136 signal.signal(signal.SIGALRM, on_alarm)
4137 r, w = os.pipe()
4138 wio = self.io.open(w, **fdopen_kwargs)
4139 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004140 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004141 # Either the reentrant call to wio.write() fails with RuntimeError,
4142 # or the signal handler raises ZeroDivisionError.
4143 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4144 while 1:
4145 for i in range(100):
4146 wio.write(data)
4147 wio.flush()
4148 # Make sure the buffer doesn't fill up and block further writes
4149 os.read(r, len(data) * 100)
4150 exc = cm.exception
4151 if isinstance(exc, RuntimeError):
4152 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4153 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004154 signal.alarm(0)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004155 wio.close()
4156 os.close(r)
4157
4158 def test_reentrant_write_buffered(self):
4159 self.check_reentrant_write(b"xy", mode="wb")
4160
4161 def test_reentrant_write_text(self):
4162 self.check_reentrant_write("xy", mode="w", encoding="ascii")
4163
Antoine Pitrou707ce822011-02-25 21:24:11 +00004164 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4165 """Check that a buffered read, when it gets interrupted (either
4166 returning a partial result or EINTR), properly invokes the signal
4167 handler and retries if the latter returned successfully."""
4168 r, w = os.pipe()
4169 fdopen_kwargs["closefd"] = False
4170 def alarm_handler(sig, frame):
4171 os.write(w, b"bar")
4172 signal.signal(signal.SIGALRM, alarm_handler)
4173 try:
4174 rio = self.io.open(r, **fdopen_kwargs)
4175 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004176 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004177 # Expected behaviour:
4178 # - first raw read() returns partial b"foo"
4179 # - second raw read() returns EINTR
4180 # - third raw read() returns b"bar"
4181 self.assertEqual(decode(rio.read(6)), "foobar")
4182 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004183 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004184 rio.close()
4185 os.close(w)
4186 os.close(r)
4187
Antoine Pitrou20db5112011-08-19 20:32:34 +02004188 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004189 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4190 mode="rb")
4191
Antoine Pitrou20db5112011-08-19 20:32:34 +02004192 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004193 self.check_interrupted_read_retry(lambda x: x,
4194 mode="r")
4195
Antoine Pitrou707ce822011-02-25 21:24:11 +00004196 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4197 """Check that a buffered write, when it gets interrupted (either
4198 returning a partial result or EINTR), properly invokes the signal
4199 handler and retries if the latter returned successfully."""
4200 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004201
Antoine Pitrou707ce822011-02-25 21:24:11 +00004202 # A quantity that exceeds the buffer size of an anonymous pipe's
4203 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02004204 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00004205 r, w = os.pipe()
4206 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004207
Antoine Pitrou707ce822011-02-25 21:24:11 +00004208 # We need a separate thread to read from the pipe and allow the
4209 # write() to finish. This thread is started after the SIGALRM is
4210 # received (forcing a first EINTR in write()).
4211 read_results = []
4212 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004213 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00004214 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004215 try:
4216 while not write_finished:
4217 while r in select.select([r], [], [], 1.0)[0]:
4218 s = os.read(r, 1024)
4219 read_results.append(s)
4220 except BaseException as exc:
4221 nonlocal error
4222 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00004223 t = threading.Thread(target=_read)
4224 t.daemon = True
4225 def alarm1(sig, frame):
4226 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004227 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004228 def alarm2(sig, frame):
4229 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004230
4231 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00004232 signal.signal(signal.SIGALRM, alarm1)
4233 try:
4234 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004235 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004236 # Expected behaviour:
4237 # - first raw write() is partial (because of the limited pipe buffer
4238 # and the first alarm)
4239 # - second raw write() returns EINTR (because of the second alarm)
4240 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004241 written = wio.write(large_data)
4242 self.assertEqual(N, written)
4243
Antoine Pitrou707ce822011-02-25 21:24:11 +00004244 wio.flush()
4245 write_finished = True
4246 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004247
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004248 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004249 self.assertEqual(N, sum(len(x) for x in read_results))
4250 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004251 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004252 write_finished = True
4253 os.close(w)
4254 os.close(r)
4255 # This is deliberate. If we didn't close the file descriptor
4256 # before closing wio, wio would try to flush its internal
4257 # buffer, and could block (in case of failure).
4258 try:
4259 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004260 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00004261 if e.errno != errno.EBADF:
4262 raise
4263
Antoine Pitrou20db5112011-08-19 20:32:34 +02004264 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004265 self.check_interrupted_write_retry(b"x", mode="wb")
4266
Antoine Pitrou20db5112011-08-19 20:32:34 +02004267 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004268 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4269
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004270
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004271class CSignalsTest(SignalsTest):
4272 io = io
4273
4274class PySignalsTest(SignalsTest):
4275 io = pyio
4276
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004277 # Handling reentrancy issues would slow down _pyio even more, so the
4278 # tests are disabled.
4279 test_reentrant_write_buffered = None
4280 test_reentrant_write_text = None
4281
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004282
Ezio Melottidaa42c72013-03-23 16:30:16 +02004283def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07004284 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004285 CBufferedReaderTest, PyBufferedReaderTest,
4286 CBufferedWriterTest, PyBufferedWriterTest,
4287 CBufferedRWPairTest, PyBufferedRWPairTest,
4288 CBufferedRandomTest, PyBufferedRandomTest,
4289 StatefulIncrementalDecoderTest,
4290 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4291 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004292 CMiscIOTest, PyMiscIOTest,
4293 CSignalsTest, PySignalsTest,
4294 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004295
4296 # Put the namespaces of the IO module we are testing and some useful mock
4297 # classes in the __dict__ of each test.
4298 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
benfogle9703f092017-11-10 16:03:40 -05004299 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4300 SlowFlushRawIO)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004301 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4302 c_io_ns = {name : getattr(io, name) for name in all_members}
4303 py_io_ns = {name : getattr(pyio, name) for name in all_members}
4304 globs = globals()
4305 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4306 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4307 # Avoid turning open into a bound method.
4308 py_io_ns["open"] = pyio.OpenWrapper
4309 for test in tests:
4310 if test.__name__.startswith("C"):
4311 for name, obj in c_io_ns.items():
4312 setattr(test, name, obj)
4313 elif test.__name__.startswith("Py"):
4314 for name, obj in py_io_ns.items():
4315 setattr(test, name, obj)
4316
Ezio Melottidaa42c72013-03-23 16:30:16 +02004317 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4318 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004319
4320if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004321 unittest.main()