blob: a7257115fc9089492e3f371fa1f9e5078325397d [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakac72e66a2015-11-02 15:06:09 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Antoine Pitrou19690592009-06-12 20:14:08 +000019# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Serhiy Storchaka05b0a1b2014-06-09 13:32:08 +030032import warnings
Antoine Pitrou19690592009-06-12 20:14:08 +000033import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000034import signal
35import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000036from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000037from collections import deque
Antoine Pitrou78e761e2012-10-16 22:57:11 +020038from UserList import UserList
Antoine Pitrou19690592009-06-12 20:14:08 +000039from test import test_support as support
Serhiy Storchaka354d50e2013-02-03 17:10:42 +020040import contextlib
Christian Heimes1a6387e2008-03-26 12:49:49 +000041
42import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000043import io # C implementation of io
44import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000045try:
46 import threading
47except ImportError:
48 threading = None
Antoine Pitrou5aa7df32011-11-21 20:16:44 +010049try:
50 import fcntl
51except ImportError:
52 fcntl = None
Antoine Pitrou19690592009-06-12 20:14:08 +000053
54__metaclass__ = type
55bytes = support.py3k_bytes
56
Martin Panterc9813d82016-06-03 05:59:20 +000057def byteslike(*pos, **kw):
58 return memoryview(bytearray(*pos, **kw))
59
Antoine Pitrou19690592009-06-12 20:14:08 +000060def _default_chunk_size():
61 """Get the default TextIOWrapper chunk size"""
62 with io.open(__file__, "r", encoding="latin1") as f:
63 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000064
65
Antoine Pitrou6391b342010-09-14 18:48:19 +000066class MockRawIOWithoutRead:
67 """A RawIO implementation without read(), so as to exercise the default
68 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000069
70 def __init__(self, read_stack=()):
71 self._read_stack = list(read_stack)
72 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000073 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000074 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000075
Christian Heimes1a6387e2008-03-26 12:49:49 +000076 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000077 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000078 return len(b)
79
80 def writable(self):
81 return True
82
83 def fileno(self):
84 return 42
85
86 def readable(self):
87 return True
88
89 def seekable(self):
90 return True
91
92 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000093 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000094
95 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000096 return 0 # same comment as above
97
98 def readinto(self, buf):
99 self._reads += 1
100 max_len = len(buf)
101 try:
102 data = self._read_stack[0]
103 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000104 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +0000105 return 0
106 if data is None:
107 del self._read_stack[0]
108 return None
109 n = len(data)
110 if len(data) <= max_len:
111 del self._read_stack[0]
112 buf[:n] = data
113 return n
114 else:
115 buf[:] = data[:max_len]
116 self._read_stack[0] = data[max_len:]
117 return max_len
118
119 def truncate(self, pos=None):
120 return pos
121
Antoine Pitrou6391b342010-09-14 18:48:19 +0000122class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
123 pass
124
125class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
126 pass
127
128
129class MockRawIO(MockRawIOWithoutRead):
130
131 def read(self, n=None):
132 self._reads += 1
133 try:
134 return self._read_stack.pop(0)
135 except:
136 self._extraneous_reads += 1
137 return b""
138
Antoine Pitrou19690592009-06-12 20:14:08 +0000139class CMockRawIO(MockRawIO, io.RawIOBase):
140 pass
141
142class PyMockRawIO(MockRawIO, pyio.RawIOBase):
143 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000144
145
Antoine Pitrou19690592009-06-12 20:14:08 +0000146class MisbehavedRawIO(MockRawIO):
147 def write(self, b):
148 return MockRawIO.write(self, b) * 2
149
150 def read(self, n=None):
151 return MockRawIO.read(self, n) * 2
152
153 def seek(self, pos, whence):
154 return -123
155
156 def tell(self):
157 return -456
158
159 def readinto(self, buf):
160 MockRawIO.readinto(self, buf)
161 return len(buf) * 5
162
163class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
164 pass
165
166class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
167 pass
168
169
170class CloseFailureIO(MockRawIO):
171 closed = 0
172
173 def close(self):
174 if not self.closed:
175 self.closed = 1
176 raise IOError
177
178class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
179 pass
180
181class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
182 pass
183
184
185class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000186
187 def __init__(self, data):
188 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000189 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000190
191 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000192 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000193 self.read_history.append(None if res is None else len(res))
194 return res
195
Antoine Pitrou19690592009-06-12 20:14:08 +0000196 def readinto(self, b):
197 res = super(MockFileIO, self).readinto(b)
198 self.read_history.append(res)
199 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000200
Antoine Pitrou19690592009-06-12 20:14:08 +0000201class CMockFileIO(MockFileIO, io.BytesIO):
202 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000203
Antoine Pitrou19690592009-06-12 20:14:08 +0000204class PyMockFileIO(MockFileIO, pyio.BytesIO):
205 pass
206
207
208class MockNonBlockWriterIO:
209
210 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000211 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000212 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000213
Antoine Pitrou19690592009-06-12 20:14:08 +0000214 def pop_written(self):
215 s = b"".join(self._write_stack)
216 self._write_stack[:] = []
217 return s
218
219 def block_on(self, char):
220 """Block when a given char is encountered."""
221 self._blocker_char = char
222
223 def readable(self):
224 return True
225
226 def seekable(self):
227 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000228
229 def writable(self):
230 return True
231
Antoine Pitrou19690592009-06-12 20:14:08 +0000232 def write(self, b):
233 b = bytes(b)
234 n = -1
235 if self._blocker_char:
236 try:
237 n = b.index(self._blocker_char)
238 except ValueError:
239 pass
240 else:
Antoine Pitrou5aa7df32011-11-21 20:16:44 +0100241 if n > 0:
242 # write data up to the first blocker
243 self._write_stack.append(b[:n])
244 return n
245 else:
246 # cancel blocker and indicate would block
247 self._blocker_char = None
248 return None
Antoine Pitrou19690592009-06-12 20:14:08 +0000249 self._write_stack.append(b)
250 return len(b)
251
252class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
253 BlockingIOError = io.BlockingIOError
254
255class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
256 BlockingIOError = pyio.BlockingIOError
257
Christian Heimes1a6387e2008-03-26 12:49:49 +0000258
259class IOTest(unittest.TestCase):
260
Antoine Pitrou19690592009-06-12 20:14:08 +0000261 def setUp(self):
262 support.unlink(support.TESTFN)
263
Christian Heimes1a6387e2008-03-26 12:49:49 +0000264 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000265 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000266
267 def write_ops(self, f):
268 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000269 f.truncate(0)
270 self.assertEqual(f.tell(), 5)
271 f.seek(0)
272
273 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000274 self.assertEqual(f.seek(0), 0)
275 self.assertEqual(f.write(b"Hello."), 6)
276 self.assertEqual(f.tell(), 6)
277 self.assertEqual(f.seek(-1, 1), 5)
278 self.assertEqual(f.tell(), 5)
Martin Panterc9813d82016-06-03 05:59:20 +0000279 buffer = bytearray(b" world\n\n\n")
280 self.assertEqual(f.write(buffer), 9)
281 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Christian Heimes1a6387e2008-03-26 12:49:49 +0000282 self.assertEqual(f.seek(0), 0)
283 self.assertEqual(f.write(b"h"), 1)
284 self.assertEqual(f.seek(-1, 2), 13)
285 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000286
Christian Heimes1a6387e2008-03-26 12:49:49 +0000287 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000288 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000289 self.assertRaises(TypeError, f.seek, 0.0)
290
291 def read_ops(self, f, buffered=False):
292 data = f.read(5)
293 self.assertEqual(data, b"hello")
Martin Panterc9813d82016-06-03 05:59:20 +0000294 data = byteslike(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000295 self.assertEqual(f.readinto(data), 5)
Martin Panterc9813d82016-06-03 05:59:20 +0000296 self.assertEqual(data.tobytes(), b" worl")
297 data = bytearray(5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000298 self.assertEqual(f.readinto(data), 2)
299 self.assertEqual(len(data), 5)
300 self.assertEqual(data[:2], b"d\n")
301 self.assertEqual(f.seek(0), 0)
302 self.assertEqual(f.read(20), b"hello world\n")
303 self.assertEqual(f.read(1), b"")
Martin Panterc9813d82016-06-03 05:59:20 +0000304 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000305 self.assertEqual(f.seek(-6, 2), 6)
306 self.assertEqual(f.read(5), b"world")
307 self.assertEqual(f.read(0), b"")
Martin Panterc9813d82016-06-03 05:59:20 +0000308 self.assertEqual(f.readinto(byteslike()), 0)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000309 self.assertEqual(f.seek(-6, 1), 5)
310 self.assertEqual(f.read(5), b" worl")
311 self.assertEqual(f.tell(), 10)
312 self.assertRaises(TypeError, f.seek, 0.0)
313 if buffered:
314 f.seek(0)
315 self.assertEqual(f.read(), b"hello world\n")
316 f.seek(6)
317 self.assertEqual(f.read(), b"world\n")
318 self.assertEqual(f.read(), b"")
319
320 LARGE = 2**31
321
322 def large_file_ops(self, f):
323 assert f.readable()
324 assert f.writable()
325 self.assertEqual(f.seek(self.LARGE), self.LARGE)
326 self.assertEqual(f.tell(), self.LARGE)
327 self.assertEqual(f.write(b"xxx"), 3)
328 self.assertEqual(f.tell(), self.LARGE + 3)
329 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
330 self.assertEqual(f.truncate(), self.LARGE + 2)
331 self.assertEqual(f.tell(), self.LARGE + 2)
332 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
333 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000334 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000335 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
336 self.assertEqual(f.seek(-1, 2), self.LARGE)
337 self.assertEqual(f.read(2), b"x")
338
Antoine Pitrou19690592009-06-12 20:14:08 +0000339 def test_invalid_operations(self):
340 # Try writing on a file opened in read mode and vice-versa.
341 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000342 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000343 self.assertRaises(IOError, fp.read)
344 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000345 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000346 self.assertRaises(IOError, fp.write, b"blah")
347 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000348 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000349 self.assertRaises(IOError, fp.write, "blah")
350 self.assertRaises(IOError, fp.writelines, ["blah\n"])
351
Serhiy Storchaka3c9ce742016-07-01 23:34:44 +0300352 def test_open_handles_NUL_chars(self):
353 fn_with_NUL = 'foo\0bar'
354 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
355
356 bytes_fn = fn_with_NUL.encode('ascii')
357 with warnings.catch_warnings():
358 warnings.simplefilter("ignore", DeprecationWarning)
359 self.assertRaises(TypeError, self.open, bytes_fn, 'w')
360
Christian Heimes1a6387e2008-03-26 12:49:49 +0000361 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000362 with self.open(support.TESTFN, "wb", buffering=0) as f:
363 self.assertEqual(f.readable(), False)
364 self.assertEqual(f.writable(), True)
365 self.assertEqual(f.seekable(), True)
366 self.write_ops(f)
367 with self.open(support.TESTFN, "rb", buffering=0) as f:
368 self.assertEqual(f.readable(), True)
369 self.assertEqual(f.writable(), False)
370 self.assertEqual(f.seekable(), True)
371 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000372
373 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000374 with self.open(support.TESTFN, "wb") as f:
375 self.assertEqual(f.readable(), False)
376 self.assertEqual(f.writable(), True)
377 self.assertEqual(f.seekable(), True)
378 self.write_ops(f)
379 with self.open(support.TESTFN, "rb") as f:
380 self.assertEqual(f.readable(), True)
381 self.assertEqual(f.writable(), False)
382 self.assertEqual(f.seekable(), True)
383 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000384
385 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000386 with self.open(support.TESTFN, "wb") as f:
387 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
388 with self.open(support.TESTFN, "rb") as f:
389 self.assertEqual(f.readline(), b"abc\n")
390 self.assertEqual(f.readline(10), b"def\n")
391 self.assertEqual(f.readline(2), b"xy")
392 self.assertEqual(f.readline(4), b"zzy\n")
393 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000394 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000395 self.assertRaises(TypeError, f.readline, 5.3)
396 with self.open(support.TESTFN, "r") as f:
397 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000398
Serhiy Storchaka64aa4df2017-04-19 22:34:58 +0300399 def test_readline_nonsizeable(self):
400 # Issue #30061
401 # Crash when readline() returns an object without __len__
402 class R(self.IOBase):
403 def readline(self):
404 return None
405 self.assertRaises((TypeError, StopIteration), next, R())
406
407 def test_next_nonsizeable(self):
408 # Issue #30061
409 # Crash when next() returns an object without __len__
410 class R(self.IOBase):
411 def next(self):
412 return None
413 self.assertRaises(TypeError, R().readlines, 1)
414
Christian Heimes1a6387e2008-03-26 12:49:49 +0000415 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000416 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000417 self.write_ops(f)
418 data = f.getvalue()
419 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000420 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000421 self.read_ops(f, True)
422
423 def test_large_file_ops(self):
424 # On Windows and Mac OSX this test comsumes large resources; It takes
425 # a long time to build the >2GB file and takes >2GB of disk space
426 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000427 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware1f702212013-12-10 14:09:20 -0600428 support.requires(
429 'largefile',
430 'test requires %s bytes and a long time to run' % self.LARGE)
Antoine Pitrou19690592009-06-12 20:14:08 +0000431 with self.open(support.TESTFN, "w+b", 0) as f:
432 self.large_file_ops(f)
433 with self.open(support.TESTFN, "w+b") as f:
434 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000435
436 def test_with_open(self):
437 for bufsize in (0, 1, 100):
438 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000439 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000440 f.write(b"xxx")
441 self.assertEqual(f.closed, True)
442 f = None
443 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000444 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000445 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000446 except ZeroDivisionError:
447 self.assertEqual(f.closed, True)
448 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000449 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000450
Antoine Pitroue741cc62009-01-21 00:45:36 +0000451 # issue 5008
452 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000453 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000454 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000455 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000456 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000457 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000458 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000459 with self.open(support.TESTFN, "a") as f:
Serhiy Storchakaea4d2872015-08-02 15:19:04 +0300460 self.assertGreater(f.tell(), 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000461
Christian Heimes1a6387e2008-03-26 12:49:49 +0000462 def test_destructor(self):
463 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000464 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000465 def __del__(self):
466 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000467 try:
468 f = super(MyFileIO, self).__del__
469 except AttributeError:
470 pass
471 else:
472 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000473 def close(self):
474 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000475 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000476 def flush(self):
477 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000478 super(MyFileIO, self).flush()
479 f = MyFileIO(support.TESTFN, "wb")
480 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000481 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000482 support.gc_collect()
483 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000484 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000485 self.assertEqual(f.read(), b"xxx")
486
487 def _check_base_destructor(self, base):
488 record = []
489 class MyIO(base):
490 def __init__(self):
491 # This exercises the availability of attributes on object
492 # destruction.
493 # (in the C version, close() is called by the tp_dealloc
494 # function, not by __del__)
495 self.on_del = 1
496 self.on_close = 2
497 self.on_flush = 3
498 def __del__(self):
499 record.append(self.on_del)
500 try:
501 f = super(MyIO, self).__del__
502 except AttributeError:
503 pass
504 else:
505 f()
506 def close(self):
507 record.append(self.on_close)
508 super(MyIO, self).close()
509 def flush(self):
510 record.append(self.on_flush)
511 super(MyIO, self).flush()
512 f = MyIO()
513 del f
514 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000515 self.assertEqual(record, [1, 2, 3])
516
Antoine Pitrou19690592009-06-12 20:14:08 +0000517 def test_IOBase_destructor(self):
518 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000519
Antoine Pitrou19690592009-06-12 20:14:08 +0000520 def test_RawIOBase_destructor(self):
521 self._check_base_destructor(self.RawIOBase)
522
523 def test_BufferedIOBase_destructor(self):
524 self._check_base_destructor(self.BufferedIOBase)
525
526 def test_TextIOBase_destructor(self):
527 self._check_base_destructor(self.TextIOBase)
528
529 def test_close_flushes(self):
530 with self.open(support.TESTFN, "wb") as f:
531 f.write(b"xxx")
532 with self.open(support.TESTFN, "rb") as f:
533 self.assertEqual(f.read(), b"xxx")
534
535 def test_array_writes(self):
536 a = array.array(b'i', range(10))
537 n = len(a.tostring())
538 with self.open(support.TESTFN, "wb", 0) as f:
539 self.assertEqual(f.write(a), n)
540 with self.open(support.TESTFN, "wb") as f:
541 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000542
543 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000544 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000545 closefd=False)
546
Antoine Pitrou19690592009-06-12 20:14:08 +0000547 def test_read_closed(self):
548 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000549 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000550 with self.open(support.TESTFN, "r") as f:
551 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000552 self.assertEqual(file.read(), "egg\n")
553 file.seek(0)
554 file.close()
555 self.assertRaises(ValueError, file.read)
556
557 def test_no_closefd_with_filename(self):
558 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000559 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000560
561 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000562 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000563 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000564 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000565 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000566 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000567 self.assertEqual(file.buffer.raw.closefd, False)
568
Antoine Pitrou19690592009-06-12 20:14:08 +0000569 def test_garbage_collection(self):
570 # FileIO objects are collected, and collecting them flushes
571 # all data to disk.
572 f = self.FileIO(support.TESTFN, "wb")
573 f.write(b"abcxxx")
574 f.f = f
575 wr = weakref.ref(f)
576 del f
577 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +0300578 self.assertIsNone(wr(), wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000579 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000580 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000581
Antoine Pitrou19690592009-06-12 20:14:08 +0000582 def test_unbounded_file(self):
583 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
584 zero = "/dev/zero"
585 if not os.path.exists(zero):
586 self.skipTest("{0} does not exist".format(zero))
587 if sys.maxsize > 0x7FFFFFFF:
588 self.skipTest("test can only run in a 32-bit address space")
589 if support.real_max_memuse < support._2G:
590 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000591 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000592 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000593 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000594 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000595 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000596 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000597
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200598 def check_flush_error_on_close(self, *args, **kwargs):
599 # Test that the file is closed despite failed flush
600 # and that flush() is called before file closed.
601 f = self.open(*args, **kwargs)
602 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000603 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200604 closed[:] = [f.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000605 raise IOError()
606 f.flush = bad_flush
607 self.assertRaises(IOError, f.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600608 self.assertTrue(f.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200609 self.assertTrue(closed) # flush() called
610 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchaka437d5352015-02-23 00:28:38 +0200611 f.flush = lambda: None # break reference loop
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200612
613 def test_flush_error_on_close(self):
614 # raw file
615 # Issue #5700: io.FileIO calls flush() after file closed
616 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
617 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
618 self.check_flush_error_on_close(fd, 'wb', buffering=0)
619 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
620 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
621 os.close(fd)
622 # buffered io
623 self.check_flush_error_on_close(support.TESTFN, 'wb')
624 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
625 self.check_flush_error_on_close(fd, 'wb')
626 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
627 self.check_flush_error_on_close(fd, 'wb', closefd=False)
628 os.close(fd)
629 # text io
630 self.check_flush_error_on_close(support.TESTFN, 'w')
631 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
632 self.check_flush_error_on_close(fd, 'w')
633 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
634 self.check_flush_error_on_close(fd, 'w', closefd=False)
635 os.close(fd)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000636
637 def test_multi_close(self):
638 f = self.open(support.TESTFN, "wb", buffering=0)
639 f.close()
640 f.close()
641 f.close()
642 self.assertRaises(ValueError, f.flush)
643
Antoine Pitrou6391b342010-09-14 18:48:19 +0000644 def test_RawIOBase_read(self):
645 # Exercise the default RawIOBase.read() implementation (which calls
646 # readinto() internally).
647 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
648 self.assertEqual(rawio.read(2), b"ab")
649 self.assertEqual(rawio.read(2), b"c")
650 self.assertEqual(rawio.read(2), b"d")
651 self.assertEqual(rawio.read(2), None)
652 self.assertEqual(rawio.read(2), b"ef")
653 self.assertEqual(rawio.read(2), b"g")
654 self.assertEqual(rawio.read(2), None)
655 self.assertEqual(rawio.read(2), b"")
656
Hynek Schlawack877effc2012-05-25 09:24:18 +0200657 def test_fileio_closefd(self):
658 # Issue #4841
659 with self.open(__file__, 'rb') as f1, \
660 self.open(__file__, 'rb') as f2:
661 fileio = self.FileIO(f1.fileno(), closefd=False)
662 # .__init__() must not close f1
663 fileio.__init__(f2.fileno(), closefd=False)
664 f1.readline()
665 # .close() must not close f2
666 fileio.close()
667 f2.readline()
668
Serhiy Storchaka05b0a1b2014-06-09 13:32:08 +0300669 def test_nonbuffered_textio(self):
670 with warnings.catch_warnings(record=True) as recorded:
671 with self.assertRaises(ValueError):
672 self.open(support.TESTFN, 'w', buffering=0)
673 support.gc_collect()
674 self.assertEqual(recorded, [])
675
676 def test_invalid_newline(self):
677 with warnings.catch_warnings(record=True) as recorded:
678 with self.assertRaises(ValueError):
679 self.open(support.TESTFN, 'w', newline='invalid')
680 support.gc_collect()
681 self.assertEqual(recorded, [])
682
Martin Panterc9813d82016-06-03 05:59:20 +0000683 def test_buffered_readinto_mixin(self):
684 # Test the implementation provided by BufferedIOBase
685 class Stream(self.BufferedIOBase):
686 def read(self, size):
687 return b"12345"
688 stream = Stream()
689 buffer = byteslike(5)
690 self.assertEqual(stream.readinto(buffer), 5)
691 self.assertEqual(buffer.tobytes(), b"12345")
692
Hynek Schlawack877effc2012-05-25 09:24:18 +0200693
Antoine Pitrou19690592009-06-12 20:14:08 +0000694class CIOTest(IOTest):
Antoine Pitrou16166452011-07-12 22:04:20 +0200695
696 def test_IOBase_finalize(self):
697 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
698 # class which inherits IOBase and an object of this class are caught
699 # in a reference cycle and close() is already in the method cache.
700 class MyIO(self.IOBase):
701 def close(self):
702 pass
703
704 # create an instance to populate the method cache
705 MyIO()
706 obj = MyIO()
707 obj.obj = obj
708 wr = weakref.ref(obj)
709 del MyIO
710 del obj
711 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +0300712 self.assertIsNone(wr(), wr)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000713
Antoine Pitrou19690592009-06-12 20:14:08 +0000714class PyIOTest(IOTest):
715 test_array_writes = unittest.skip(
716 "len(array.array) returns number of elements rather than bytelength"
717 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000718
719
Antoine Pitrou19690592009-06-12 20:14:08 +0000720class CommonBufferedTests:
721 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
722
723 def test_detach(self):
724 raw = self.MockRawIO()
725 buf = self.tp(raw)
726 self.assertIs(buf.detach(), raw)
727 self.assertRaises(ValueError, buf.detach)
728
Benjamin Peterson53ae6142014-12-21 20:51:50 -0600729 repr(buf) # Should still work
730
Antoine Pitrou19690592009-06-12 20:14:08 +0000731 def test_fileno(self):
732 rawio = self.MockRawIO()
733 bufio = self.tp(rawio)
734
Ezio Melotti2623a372010-11-21 13:34:58 +0000735 self.assertEqual(42, bufio.fileno())
Antoine Pitrou19690592009-06-12 20:14:08 +0000736
Antoine Pitrou19690592009-06-12 20:14:08 +0000737 def test_invalid_args(self):
738 rawio = self.MockRawIO()
739 bufio = self.tp(rawio)
740 # Invalid whence
741 self.assertRaises(ValueError, bufio.seek, 0, -1)
742 self.assertRaises(ValueError, bufio.seek, 0, 3)
743
744 def test_override_destructor(self):
745 tp = self.tp
746 record = []
747 class MyBufferedIO(tp):
748 def __del__(self):
749 record.append(1)
750 try:
751 f = super(MyBufferedIO, self).__del__
752 except AttributeError:
753 pass
754 else:
755 f()
756 def close(self):
757 record.append(2)
758 super(MyBufferedIO, self).close()
759 def flush(self):
760 record.append(3)
761 super(MyBufferedIO, self).flush()
762 rawio = self.MockRawIO()
763 bufio = MyBufferedIO(rawio)
764 writable = bufio.writable()
765 del bufio
766 support.gc_collect()
767 if writable:
768 self.assertEqual(record, [1, 2, 3])
769 else:
770 self.assertEqual(record, [1, 2])
771
772 def test_context_manager(self):
773 # Test usability as a context manager
774 rawio = self.MockRawIO()
775 bufio = self.tp(rawio)
776 def _with():
777 with bufio:
778 pass
779 _with()
780 # bufio should now be closed, and using it a second time should raise
781 # a ValueError.
782 self.assertRaises(ValueError, _with)
783
784 def test_error_through_destructor(self):
785 # Test that the exception state is not modified by a destructor,
786 # even if close() fails.
787 rawio = self.CloseFailureIO()
788 def f():
789 self.tp(rawio).xyzzy
790 with support.captured_output("stderr") as s:
791 self.assertRaises(AttributeError, f)
792 s = s.getvalue().strip()
793 if s:
794 # The destructor *may* have printed an unraisable error, check it
795 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000796 self.assertTrue(s.startswith("Exception IOError: "), s)
797 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000798
799 def test_repr(self):
800 raw = self.MockRawIO()
801 b = self.tp(raw)
802 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
803 self.assertEqual(repr(b), "<%s>" % clsname)
804 raw.name = "dummy"
805 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
806 raw.name = b"dummy"
807 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000808
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000809 def test_flush_error_on_close(self):
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200810 # Test that buffered file is closed despite failed flush
811 # and that flush() is called before file closed.
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000812 raw = self.MockRawIO()
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200813 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000814 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200815 closed[:] = [b.closed, raw.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000816 raise IOError()
817 raw.flush = bad_flush
818 b = self.tp(raw)
819 self.assertRaises(IOError, b.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600820 self.assertTrue(b.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200821 self.assertTrue(raw.closed)
822 self.assertTrue(closed) # flush() called
823 self.assertFalse(closed[0]) # flush() called before file closed
824 self.assertFalse(closed[1])
Serhiy Storchaka437d5352015-02-23 00:28:38 +0200825 raw.flush = lambda: None # break reference loop
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600826
827 def test_close_error_on_close(self):
828 raw = self.MockRawIO()
829 def bad_flush():
830 raise IOError('flush')
831 def bad_close():
832 raise IOError('close')
833 raw.close = bad_close
834 b = self.tp(raw)
835 b.flush = bad_flush
836 with self.assertRaises(IOError) as err: # exception not swallowed
837 b.close()
838 self.assertEqual(err.exception.args, ('close',))
839 self.assertFalse(b.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000840
841 def test_multi_close(self):
842 raw = self.MockRawIO()
843 b = self.tp(raw)
844 b.close()
845 b.close()
846 b.close()
847 self.assertRaises(ValueError, b.flush)
848
Antoine Pitroufc9ead62010-12-21 21:26:55 +0000849 def test_readonly_attributes(self):
850 raw = self.MockRawIO()
851 buf = self.tp(raw)
852 x = self.MockRawIO()
853 with self.assertRaises((AttributeError, TypeError)):
854 buf.raw = x
855
Christian Heimes1a6387e2008-03-26 12:49:49 +0000856
Antoine Pitroubff5df02012-07-29 19:02:46 +0200857class SizeofTest:
858
859 @support.cpython_only
860 def test_sizeof(self):
861 bufsize1 = 4096
862 bufsize2 = 8192
863 rawio = self.MockRawIO()
864 bufio = self.tp(rawio, buffer_size=bufsize1)
865 size = sys.getsizeof(bufio) - bufsize1
866 rawio = self.MockRawIO()
867 bufio = self.tp(rawio, buffer_size=bufsize2)
868 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
869
870
Antoine Pitrou19690592009-06-12 20:14:08 +0000871class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
872 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000873
Antoine Pitrou19690592009-06-12 20:14:08 +0000874 def test_constructor(self):
875 rawio = self.MockRawIO([b"abc"])
876 bufio = self.tp(rawio)
877 bufio.__init__(rawio)
878 bufio.__init__(rawio, buffer_size=1024)
879 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000880 self.assertEqual(b"abc", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000881 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
882 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
883 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
884 rawio = self.MockRawIO([b"abc"])
885 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000886 self.assertEqual(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000887
Serhiy Storchaka1d19f972014-02-12 10:52:07 +0200888 def test_uninitialized(self):
889 bufio = self.tp.__new__(self.tp)
890 del bufio
891 bufio = self.tp.__new__(self.tp)
892 self.assertRaisesRegexp((ValueError, AttributeError),
893 'uninitialized|has no attribute',
894 bufio.read, 0)
895 bufio.__init__(self.MockRawIO())
896 self.assertEqual(bufio.read(0), b'')
897
Antoine Pitrou19690592009-06-12 20:14:08 +0000898 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000899 for arg in (None, 7):
900 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
901 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000902 self.assertEqual(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000903 # Invalid args
904 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000905
Antoine Pitrou19690592009-06-12 20:14:08 +0000906 def test_read1(self):
907 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
908 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000909 self.assertEqual(b"a", bufio.read(1))
910 self.assertEqual(b"b", bufio.read1(1))
911 self.assertEqual(rawio._reads, 1)
912 self.assertEqual(b"c", bufio.read1(100))
913 self.assertEqual(rawio._reads, 1)
914 self.assertEqual(b"d", bufio.read1(100))
915 self.assertEqual(rawio._reads, 2)
916 self.assertEqual(b"efg", bufio.read1(100))
917 self.assertEqual(rawio._reads, 3)
918 self.assertEqual(b"", bufio.read1(100))
919 self.assertEqual(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000920 # Invalid args
921 self.assertRaises(ValueError, bufio.read1, -1)
922
923 def test_readinto(self):
924 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
925 bufio = self.tp(rawio)
926 b = bytearray(2)
Ezio Melotti2623a372010-11-21 13:34:58 +0000927 self.assertEqual(bufio.readinto(b), 2)
928 self.assertEqual(b, b"ab")
929 self.assertEqual(bufio.readinto(b), 2)
930 self.assertEqual(b, b"cd")
931 self.assertEqual(bufio.readinto(b), 2)
932 self.assertEqual(b, b"ef")
933 self.assertEqual(bufio.readinto(b), 1)
934 self.assertEqual(b, b"gf")
935 self.assertEqual(bufio.readinto(b), 0)
936 self.assertEqual(b, b"gf")
Antoine Pitrou19690592009-06-12 20:14:08 +0000937
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000938 def test_readlines(self):
939 def bufio():
940 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
941 return self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000942 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
943 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
944 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000945
Antoine Pitrou19690592009-06-12 20:14:08 +0000946 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000947 data = b"abcdefghi"
948 dlen = len(data)
949
950 tests = [
951 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
952 [ 100, [ 3, 3, 3], [ dlen ] ],
953 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
954 ]
955
956 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000957 rawio = self.MockFileIO(data)
958 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000959 pos = 0
960 for nbytes in buf_read_sizes:
Ezio Melotti2623a372010-11-21 13:34:58 +0000961 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Christian Heimes1a6387e2008-03-26 12:49:49 +0000962 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000963 # this is mildly implementation-dependent
Ezio Melotti2623a372010-11-21 13:34:58 +0000964 self.assertEqual(rawio.read_history, raw_read_sizes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000965
Antoine Pitrou19690592009-06-12 20:14:08 +0000966 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000967 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000968 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
969 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000970 self.assertEqual(b"abcd", bufio.read(6))
971 self.assertEqual(b"e", bufio.read(1))
972 self.assertEqual(b"fg", bufio.read())
973 self.assertEqual(b"", bufio.peek(1))
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200974 self.assertIsNone(bufio.read())
Ezio Melotti2623a372010-11-21 13:34:58 +0000975 self.assertEqual(b"", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000976
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200977 rawio = self.MockRawIO((b"a", None, None))
978 self.assertEqual(b"a", rawio.readall())
979 self.assertIsNone(rawio.readall())
980
Antoine Pitrou19690592009-06-12 20:14:08 +0000981 def test_read_past_eof(self):
982 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
983 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000984
Ezio Melotti2623a372010-11-21 13:34:58 +0000985 self.assertEqual(b"abcdefg", bufio.read(9000))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000986
Antoine Pitrou19690592009-06-12 20:14:08 +0000987 def test_read_all(self):
988 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
989 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000990
Ezio Melotti2623a372010-11-21 13:34:58 +0000991 self.assertEqual(b"abcdefg", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000992
Victor Stinner6a102812010-04-27 23:55:59 +0000993 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +0000994 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +0000995 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000996 try:
997 # Write out many bytes with exactly the same number of 0's,
998 # 1's... 255's. This will help us check that concurrent reading
999 # doesn't duplicate or forget contents.
1000 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +00001001 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001002 random.shuffle(l)
1003 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001004 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001005 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001006 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001007 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001008 errors = []
1009 results = []
1010 def f():
1011 try:
1012 # Intra-buffer read then buffer-flushing read
1013 for n in cycle([1, 19]):
1014 s = bufio.read(n)
1015 if not s:
1016 break
1017 # list.append() is atomic
1018 results.append(s)
1019 except Exception as e:
1020 errors.append(e)
1021 raise
1022 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03001023 with support.start_threads(threads):
1024 time.sleep(0.02) # yield
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001025 self.assertFalse(errors,
1026 "the following exceptions were caught: %r" % errors)
1027 s = b''.join(results)
1028 for i in range(256):
1029 c = bytes(bytearray([i]))
1030 self.assertEqual(s.count(c), N)
1031 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001032 support.unlink(support.TESTFN)
1033
1034 def test_misbehaved_io(self):
1035 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1036 bufio = self.tp(rawio)
1037 self.assertRaises(IOError, bufio.seek, 0)
1038 self.assertRaises(IOError, bufio.tell)
1039
Antoine Pitroucb4f47c2010-08-11 13:40:17 +00001040 def test_no_extraneous_read(self):
1041 # Issue #9550; when the raw IO object has satisfied the read request,
1042 # we should not issue any additional reads, otherwise it may block
1043 # (e.g. socket).
1044 bufsize = 16
1045 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1046 rawio = self.MockRawIO([b"x" * n])
1047 bufio = self.tp(rawio, bufsize)
1048 self.assertEqual(bufio.read(n), b"x" * n)
1049 # Simple case: one raw read is enough to satisfy the request.
1050 self.assertEqual(rawio._extraneous_reads, 0,
1051 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1052 # A more complex case where two raw reads are needed to satisfy
1053 # the request.
1054 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1055 bufio = self.tp(rawio, bufsize)
1056 self.assertEqual(bufio.read(n), b"x" * n)
1057 self.assertEqual(rawio._extraneous_reads, 0,
1058 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1059
1060
Antoine Pitroubff5df02012-07-29 19:02:46 +02001061class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001062 tp = io.BufferedReader
1063
1064 def test_constructor(self):
1065 BufferedReaderTest.test_constructor(self)
1066 # The allocation can succeed on 32-bit builds, e.g. with more
1067 # than 2GB RAM and a 64-bit kernel.
1068 if sys.maxsize > 0x7FFFFFFF:
1069 rawio = self.MockRawIO()
1070 bufio = self.tp(rawio)
1071 self.assertRaises((OverflowError, MemoryError, ValueError),
1072 bufio.__init__, rawio, sys.maxsize)
1073
1074 def test_initialization(self):
1075 rawio = self.MockRawIO([b"abc"])
1076 bufio = self.tp(rawio)
1077 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1078 self.assertRaises(ValueError, bufio.read)
1079 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1080 self.assertRaises(ValueError, bufio.read)
1081 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1082 self.assertRaises(ValueError, bufio.read)
1083
1084 def test_misbehaved_io_read(self):
1085 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1086 bufio = self.tp(rawio)
1087 # _pyio.BufferedReader seems to implement reading different, so that
1088 # checking this is not so easy.
1089 self.assertRaises(IOError, bufio.read, 10)
1090
1091 def test_garbage_collection(self):
1092 # C BufferedReader objects are collected.
1093 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakab02ceb52018-06-04 06:37:57 +03001094 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou19690592009-06-12 20:14:08 +00001095 rawio = self.FileIO(support.TESTFN, "w+b")
1096 f = self.tp(rawio)
1097 f.f = f
1098 wr = weakref.ref(f)
1099 del f
1100 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03001101 self.assertIsNone(wr(), wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00001102
R David Murray5b2cf5e2013-02-23 22:11:21 -05001103 def test_args_error(self):
1104 # Issue #17275
1105 with self.assertRaisesRegexp(TypeError, "BufferedReader"):
1106 self.tp(io.BytesIO(), 1024, 1024, 1024)
1107
1108
Antoine Pitrou19690592009-06-12 20:14:08 +00001109class PyBufferedReaderTest(BufferedReaderTest):
1110 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001111
1112
Antoine Pitrou19690592009-06-12 20:14:08 +00001113class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1114 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001115
Antoine Pitrou19690592009-06-12 20:14:08 +00001116 def test_constructor(self):
1117 rawio = self.MockRawIO()
1118 bufio = self.tp(rawio)
1119 bufio.__init__(rawio)
1120 bufio.__init__(rawio, buffer_size=1024)
1121 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +00001122 self.assertEqual(3, bufio.write(b"abc"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001123 bufio.flush()
1124 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1125 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1126 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1127 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +00001128 self.assertEqual(3, bufio.write(b"ghi"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001129 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001130 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001131
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001132 def test_uninitialized(self):
1133 bufio = self.tp.__new__(self.tp)
1134 del bufio
1135 bufio = self.tp.__new__(self.tp)
1136 self.assertRaisesRegexp((ValueError, AttributeError),
1137 'uninitialized|has no attribute',
1138 bufio.write, b'')
1139 bufio.__init__(self.MockRawIO())
1140 self.assertEqual(bufio.write(b''), 0)
1141
Antoine Pitrou19690592009-06-12 20:14:08 +00001142 def test_detach_flush(self):
1143 raw = self.MockRawIO()
1144 buf = self.tp(raw)
1145 buf.write(b"howdy!")
1146 self.assertFalse(raw._write_stack)
1147 buf.detach()
1148 self.assertEqual(raw._write_stack, [b"howdy!"])
1149
1150 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001151 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +00001152 writer = self.MockRawIO()
1153 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001154 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001155 self.assertFalse(writer._write_stack)
Martin Panterc9813d82016-06-03 05:59:20 +00001156 buffer = bytearray(b"def")
1157 bufio.write(buffer)
1158 buffer[:] = b"***" # Overwrite our copy of the data
1159 bufio.flush()
1160 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001161
Antoine Pitrou19690592009-06-12 20:14:08 +00001162 def test_write_overflow(self):
1163 writer = self.MockRawIO()
1164 bufio = self.tp(writer, 8)
1165 contents = b"abcdefghijklmnop"
1166 for n in range(0, len(contents), 3):
1167 bufio.write(contents[n:n+3])
1168 flushed = b"".join(writer._write_stack)
1169 # At least (total - 8) bytes were implicitly flushed, perhaps more
1170 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001171 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001172
Antoine Pitrou19690592009-06-12 20:14:08 +00001173 def check_writes(self, intermediate_func):
1174 # Lots of writes, test the flushed output is as expected.
1175 contents = bytes(range(256)) * 1000
1176 n = 0
1177 writer = self.MockRawIO()
1178 bufio = self.tp(writer, 13)
1179 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1180 def gen_sizes():
1181 for size in count(1):
1182 for i in range(15):
1183 yield size
1184 sizes = gen_sizes()
1185 while n < len(contents):
1186 size = min(next(sizes), len(contents) - n)
Ezio Melotti2623a372010-11-21 13:34:58 +00001187 self.assertEqual(bufio.write(contents[n:n+size]), size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001188 intermediate_func(bufio)
1189 n += size
1190 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001191 self.assertEqual(contents,
Antoine Pitrou19690592009-06-12 20:14:08 +00001192 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001193
Antoine Pitrou19690592009-06-12 20:14:08 +00001194 def test_writes(self):
1195 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001196
Antoine Pitrou19690592009-06-12 20:14:08 +00001197 def test_writes_and_flushes(self):
1198 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001199
Antoine Pitrou19690592009-06-12 20:14:08 +00001200 def test_writes_and_seeks(self):
1201 def _seekabs(bufio):
1202 pos = bufio.tell()
1203 bufio.seek(pos + 1, 0)
1204 bufio.seek(pos - 1, 0)
1205 bufio.seek(pos, 0)
1206 self.check_writes(_seekabs)
1207 def _seekrel(bufio):
1208 pos = bufio.seek(0, 1)
1209 bufio.seek(+1, 1)
1210 bufio.seek(-1, 1)
1211 bufio.seek(pos, 0)
1212 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001213
Antoine Pitrou19690592009-06-12 20:14:08 +00001214 def test_writes_and_truncates(self):
1215 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001216
Antoine Pitrou19690592009-06-12 20:14:08 +00001217 def test_write_non_blocking(self):
1218 raw = self.MockNonBlockWriterIO()
1219 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001220
Ezio Melotti2623a372010-11-21 13:34:58 +00001221 self.assertEqual(bufio.write(b"abcd"), 4)
1222 self.assertEqual(bufio.write(b"efghi"), 5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001223 # 1 byte will be written, the rest will be buffered
1224 raw.block_on(b"k")
Ezio Melotti2623a372010-11-21 13:34:58 +00001225 self.assertEqual(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001226
Antoine Pitrou19690592009-06-12 20:14:08 +00001227 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1228 raw.block_on(b"0")
1229 try:
1230 bufio.write(b"opqrwxyz0123456789")
1231 except self.BlockingIOError as e:
1232 written = e.characters_written
1233 else:
1234 self.fail("BlockingIOError should have been raised")
Ezio Melotti2623a372010-11-21 13:34:58 +00001235 self.assertEqual(written, 16)
1236 self.assertEqual(raw.pop_written(),
Antoine Pitrou19690592009-06-12 20:14:08 +00001237 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001238
Ezio Melotti2623a372010-11-21 13:34:58 +00001239 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Antoine Pitrou19690592009-06-12 20:14:08 +00001240 s = raw.pop_written()
1241 # Previously buffered bytes were flushed
1242 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001243
Antoine Pitrou19690592009-06-12 20:14:08 +00001244 def test_write_and_rewind(self):
1245 raw = io.BytesIO()
1246 bufio = self.tp(raw, 4)
1247 self.assertEqual(bufio.write(b"abcdef"), 6)
1248 self.assertEqual(bufio.tell(), 6)
1249 bufio.seek(0, 0)
1250 self.assertEqual(bufio.write(b"XY"), 2)
1251 bufio.seek(6, 0)
1252 self.assertEqual(raw.getvalue(), b"XYcdef")
1253 self.assertEqual(bufio.write(b"123456"), 6)
1254 bufio.flush()
1255 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001256
Antoine Pitrou19690592009-06-12 20:14:08 +00001257 def test_flush(self):
1258 writer = self.MockRawIO()
1259 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001260 bufio.write(b"abc")
1261 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001262 self.assertEqual(b"abc", writer._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001263
Antoine Pitrou78e761e2012-10-16 22:57:11 +02001264 def test_writelines(self):
1265 l = [b'ab', b'cd', b'ef']
1266 writer = self.MockRawIO()
1267 bufio = self.tp(writer, 8)
1268 bufio.writelines(l)
1269 bufio.flush()
1270 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1271
1272 def test_writelines_userlist(self):
1273 l = UserList([b'ab', b'cd', b'ef'])
1274 writer = self.MockRawIO()
1275 bufio = self.tp(writer, 8)
1276 bufio.writelines(l)
1277 bufio.flush()
1278 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1279
1280 def test_writelines_error(self):
1281 writer = self.MockRawIO()
1282 bufio = self.tp(writer, 8)
1283 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1284 self.assertRaises(TypeError, bufio.writelines, None)
1285
Antoine Pitrou19690592009-06-12 20:14:08 +00001286 def test_destructor(self):
1287 writer = self.MockRawIO()
1288 bufio = self.tp(writer, 8)
1289 bufio.write(b"abc")
1290 del bufio
1291 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001292 self.assertEqual(b"abc", writer._write_stack[0])
Antoine Pitrou19690592009-06-12 20:14:08 +00001293
1294 def test_truncate(self):
1295 # Truncate implicitly flushes the buffer.
Serhiy Storchakab02ceb52018-06-04 06:37:57 +03001296 self.addCleanup(support.unlink, support.TESTFN)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001297 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001298 bufio = self.tp(raw, 8)
1299 bufio.write(b"abcdef")
1300 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001301 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001302 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001303 self.assertEqual(f.read(), b"abc")
1304
Victor Stinner6a102812010-04-27 23:55:59 +00001305 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001306 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001307 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001308 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001309 # Write out many bytes from many threads and test they were
1310 # all flushed.
1311 N = 1000
1312 contents = bytes(range(256)) * N
1313 sizes = cycle([1, 19])
1314 n = 0
1315 queue = deque()
1316 while n < len(contents):
1317 size = next(sizes)
1318 queue.append(contents[n:n+size])
1319 n += size
1320 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001321 # We use a real file object because it allows us to
1322 # exercise situations where the GIL is released before
1323 # writing the buffer to the raw streams. This is in addition
1324 # to concurrency issues due to switching threads in the middle
1325 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001326 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001327 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001328 errors = []
1329 def f():
1330 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001331 while True:
1332 try:
1333 s = queue.popleft()
1334 except IndexError:
1335 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001336 bufio.write(s)
1337 except Exception as e:
1338 errors.append(e)
1339 raise
1340 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03001341 with support.start_threads(threads):
1342 time.sleep(0.02) # yield
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001343 self.assertFalse(errors,
1344 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001345 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001346 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001347 s = f.read()
1348 for i in range(256):
Ezio Melotti2623a372010-11-21 13:34:58 +00001349 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001350 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001351 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001352
Antoine Pitrou19690592009-06-12 20:14:08 +00001353 def test_misbehaved_io(self):
1354 rawio = self.MisbehavedRawIO()
1355 bufio = self.tp(rawio, 5)
1356 self.assertRaises(IOError, bufio.seek, 0)
1357 self.assertRaises(IOError, bufio.tell)
1358 self.assertRaises(IOError, bufio.write, b"abcdef")
1359
1360 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001361 with support.check_warnings(("max_buffer_size is deprecated",
1362 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001363 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001364
Benjamin Petersona2d6d712012-12-20 12:24:10 -06001365 def test_write_error_on_close(self):
1366 raw = self.MockRawIO()
1367 def bad_write(b):
1368 raise IOError()
1369 raw.write = bad_write
1370 b = self.tp(raw)
1371 b.write(b'spam')
1372 self.assertRaises(IOError, b.close) # exception not swallowed
1373 self.assertTrue(b.closed)
1374
Antoine Pitrou19690592009-06-12 20:14:08 +00001375
Antoine Pitroubff5df02012-07-29 19:02:46 +02001376class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001377 tp = io.BufferedWriter
1378
1379 def test_constructor(self):
1380 BufferedWriterTest.test_constructor(self)
1381 # The allocation can succeed on 32-bit builds, e.g. with more
1382 # than 2GB RAM and a 64-bit kernel.
1383 if sys.maxsize > 0x7FFFFFFF:
1384 rawio = self.MockRawIO()
1385 bufio = self.tp(rawio)
1386 self.assertRaises((OverflowError, MemoryError, ValueError),
1387 bufio.__init__, rawio, sys.maxsize)
1388
1389 def test_initialization(self):
1390 rawio = self.MockRawIO()
1391 bufio = self.tp(rawio)
1392 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1393 self.assertRaises(ValueError, bufio.write, b"def")
1394 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1395 self.assertRaises(ValueError, bufio.write, b"def")
1396 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1397 self.assertRaises(ValueError, bufio.write, b"def")
1398
1399 def test_garbage_collection(self):
1400 # C BufferedWriter objects are collected, and collecting them flushes
1401 # all data to disk.
1402 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakab02ceb52018-06-04 06:37:57 +03001403 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou19690592009-06-12 20:14:08 +00001404 rawio = self.FileIO(support.TESTFN, "w+b")
1405 f = self.tp(rawio)
1406 f.write(b"123xxx")
1407 f.x = f
1408 wr = weakref.ref(f)
1409 del f
1410 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03001411 self.assertIsNone(wr(), wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001412 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001413 self.assertEqual(f.read(), b"123xxx")
1414
R David Murray5b2cf5e2013-02-23 22:11:21 -05001415 def test_args_error(self):
1416 # Issue #17275
1417 with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
1418 self.tp(io.BytesIO(), 1024, 1024, 1024)
1419
Antoine Pitrou19690592009-06-12 20:14:08 +00001420
1421class PyBufferedWriterTest(BufferedWriterTest):
1422 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001423
1424class BufferedRWPairTest(unittest.TestCase):
1425
Antoine Pitrou19690592009-06-12 20:14:08 +00001426 def test_constructor(self):
1427 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001428 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001429
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001430 def test_uninitialized(self):
1431 pair = self.tp.__new__(self.tp)
1432 del pair
1433 pair = self.tp.__new__(self.tp)
1434 self.assertRaisesRegexp((ValueError, AttributeError),
1435 'uninitialized|has no attribute',
1436 pair.read, 0)
1437 self.assertRaisesRegexp((ValueError, AttributeError),
1438 'uninitialized|has no attribute',
1439 pair.write, b'')
1440 pair.__init__(self.MockRawIO(), self.MockRawIO())
1441 self.assertEqual(pair.read(0), b'')
1442 self.assertEqual(pair.write(b''), 0)
1443
Antoine Pitrou19690592009-06-12 20:14:08 +00001444 def test_detach(self):
1445 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1446 self.assertRaises(self.UnsupportedOperation, pair.detach)
1447
1448 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001449 with support.check_warnings(("max_buffer_size is deprecated",
1450 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001451 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001452
1453 def test_constructor_with_not_readable(self):
1454 class NotReadable(MockRawIO):
1455 def readable(self):
1456 return False
1457
1458 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1459
1460 def test_constructor_with_not_writeable(self):
1461 class NotWriteable(MockRawIO):
1462 def writable(self):
1463 return False
1464
1465 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1466
1467 def test_read(self):
1468 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1469
1470 self.assertEqual(pair.read(3), b"abc")
1471 self.assertEqual(pair.read(1), b"d")
1472 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001473 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1474 self.assertEqual(pair.read(None), b"abc")
1475
1476 def test_readlines(self):
1477 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1478 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1479 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1480 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001481
1482 def test_read1(self):
1483 # .read1() is delegated to the underlying reader object, so this test
1484 # can be shallow.
1485 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1486
1487 self.assertEqual(pair.read1(3), b"abc")
1488
1489 def test_readinto(self):
1490 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1491
Martin Panterc9813d82016-06-03 05:59:20 +00001492 data = byteslike(5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001493 self.assertEqual(pair.readinto(data), 5)
Martin Panterc9813d82016-06-03 05:59:20 +00001494 self.assertEqual(data.tobytes(), b"abcde")
Antoine Pitrou19690592009-06-12 20:14:08 +00001495
1496 def test_write(self):
1497 w = self.MockRawIO()
1498 pair = self.tp(self.MockRawIO(), w)
1499
1500 pair.write(b"abc")
1501 pair.flush()
Martin Panterc9813d82016-06-03 05:59:20 +00001502 buffer = bytearray(b"def")
1503 pair.write(buffer)
1504 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitrou19690592009-06-12 20:14:08 +00001505 pair.flush()
1506 self.assertEqual(w._write_stack, [b"abc", b"def"])
1507
1508 def test_peek(self):
1509 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1510
1511 self.assertTrue(pair.peek(3).startswith(b"abc"))
1512 self.assertEqual(pair.read(3), b"abc")
1513
1514 def test_readable(self):
1515 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1516 self.assertTrue(pair.readable())
1517
1518 def test_writeable(self):
1519 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1520 self.assertTrue(pair.writable())
1521
1522 def test_seekable(self):
1523 # BufferedRWPairs are never seekable, even if their readers and writers
1524 # are.
1525 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1526 self.assertFalse(pair.seekable())
1527
1528 # .flush() is delegated to the underlying writer object and has been
1529 # tested in the test_write method.
1530
1531 def test_close_and_closed(self):
1532 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1533 self.assertFalse(pair.closed)
1534 pair.close()
1535 self.assertTrue(pair.closed)
1536
Serhiy Storchakaf95a57f2015-03-24 23:23:42 +02001537 def test_reader_close_error_on_close(self):
1538 def reader_close():
1539 reader_non_existing
1540 reader = self.MockRawIO()
1541 reader.close = reader_close
1542 writer = self.MockRawIO()
1543 pair = self.tp(reader, writer)
1544 with self.assertRaises(NameError) as err:
1545 pair.close()
1546 self.assertIn('reader_non_existing', str(err.exception))
1547 self.assertTrue(pair.closed)
1548 self.assertFalse(reader.closed)
1549 self.assertTrue(writer.closed)
1550
1551 def test_writer_close_error_on_close(self):
1552 def writer_close():
1553 writer_non_existing
1554 reader = self.MockRawIO()
1555 writer = self.MockRawIO()
1556 writer.close = writer_close
1557 pair = self.tp(reader, writer)
1558 with self.assertRaises(NameError) as err:
1559 pair.close()
1560 self.assertIn('writer_non_existing', str(err.exception))
1561 self.assertFalse(pair.closed)
1562 self.assertTrue(reader.closed)
1563 self.assertFalse(writer.closed)
1564
1565 def test_reader_writer_close_error_on_close(self):
1566 def reader_close():
1567 reader_non_existing
1568 def writer_close():
1569 writer_non_existing
1570 reader = self.MockRawIO()
1571 reader.close = reader_close
1572 writer = self.MockRawIO()
1573 writer.close = writer_close
1574 pair = self.tp(reader, writer)
1575 with self.assertRaises(NameError) as err:
1576 pair.close()
1577 self.assertIn('reader_non_existing', str(err.exception))
1578 self.assertFalse(pair.closed)
1579 self.assertFalse(reader.closed)
1580 self.assertFalse(writer.closed)
1581
Antoine Pitrou19690592009-06-12 20:14:08 +00001582 def test_isatty(self):
1583 class SelectableIsAtty(MockRawIO):
1584 def __init__(self, isatty):
1585 MockRawIO.__init__(self)
1586 self._isatty = isatty
1587
1588 def isatty(self):
1589 return self._isatty
1590
1591 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1592 self.assertFalse(pair.isatty())
1593
1594 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1595 self.assertTrue(pair.isatty())
1596
1597 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1598 self.assertTrue(pair.isatty())
1599
1600 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1601 self.assertTrue(pair.isatty())
1602
Benjamin Peterson1c873bf2014-09-29 22:46:57 -04001603 def test_weakref_clearing(self):
1604 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1605 ref = weakref.ref(brw)
1606 brw = None
1607 ref = None # Shouldn't segfault.
1608
Antoine Pitrou19690592009-06-12 20:14:08 +00001609class CBufferedRWPairTest(BufferedRWPairTest):
1610 tp = io.BufferedRWPair
1611
1612class PyBufferedRWPairTest(BufferedRWPairTest):
1613 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001614
1615
Antoine Pitrou19690592009-06-12 20:14:08 +00001616class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1617 read_mode = "rb+"
1618 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001619
Antoine Pitrou19690592009-06-12 20:14:08 +00001620 def test_constructor(self):
1621 BufferedReaderTest.test_constructor(self)
1622 BufferedWriterTest.test_constructor(self)
1623
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001624 def test_uninitialized(self):
1625 BufferedReaderTest.test_uninitialized(self)
1626 BufferedWriterTest.test_uninitialized(self)
1627
Antoine Pitrou19690592009-06-12 20:14:08 +00001628 def test_read_and_write(self):
1629 raw = self.MockRawIO((b"asdf", b"ghjk"))
1630 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001631
1632 self.assertEqual(b"as", rw.read(2))
1633 rw.write(b"ddd")
1634 rw.write(b"eee")
1635 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001636 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001637 self.assertEqual(b"dddeee", raw._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001638
Antoine Pitrou19690592009-06-12 20:14:08 +00001639 def test_seek_and_tell(self):
1640 raw = self.BytesIO(b"asdfghjkl")
1641 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001642
Ezio Melotti2623a372010-11-21 13:34:58 +00001643 self.assertEqual(b"as", rw.read(2))
1644 self.assertEqual(2, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001645 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001646 self.assertEqual(b"asdf", rw.read(4))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001647
Antoine Pitrou808cec52011-08-20 15:40:58 +02001648 rw.write(b"123f")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001649 rw.seek(0, 0)
Antoine Pitrou808cec52011-08-20 15:40:58 +02001650 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001651 self.assertEqual(9, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001652 rw.seek(-4, 2)
Ezio Melotti2623a372010-11-21 13:34:58 +00001653 self.assertEqual(5, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001654 rw.seek(2, 1)
Ezio Melotti2623a372010-11-21 13:34:58 +00001655 self.assertEqual(7, rw.tell())
1656 self.assertEqual(b"fl", rw.read(11))
Antoine Pitrou808cec52011-08-20 15:40:58 +02001657 rw.flush()
1658 self.assertEqual(b"asdf123fl", raw.getvalue())
1659
Christian Heimes1a6387e2008-03-26 12:49:49 +00001660 self.assertRaises(TypeError, rw.seek, 0.0)
1661
Antoine Pitrou19690592009-06-12 20:14:08 +00001662 def check_flush_and_read(self, read_func):
1663 raw = self.BytesIO(b"abcdefghi")
1664 bufio = self.tp(raw)
1665
Ezio Melotti2623a372010-11-21 13:34:58 +00001666 self.assertEqual(b"ab", read_func(bufio, 2))
Antoine Pitrou19690592009-06-12 20:14:08 +00001667 bufio.write(b"12")
Ezio Melotti2623a372010-11-21 13:34:58 +00001668 self.assertEqual(b"ef", read_func(bufio, 2))
1669 self.assertEqual(6, bufio.tell())
Antoine Pitrou19690592009-06-12 20:14:08 +00001670 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001671 self.assertEqual(6, bufio.tell())
1672 self.assertEqual(b"ghi", read_func(bufio))
Antoine Pitrou19690592009-06-12 20:14:08 +00001673 raw.seek(0, 0)
1674 raw.write(b"XYZ")
1675 # flush() resets the read buffer
1676 bufio.flush()
1677 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001678 self.assertEqual(b"XYZ", read_func(bufio, 3))
Antoine Pitrou19690592009-06-12 20:14:08 +00001679
1680 def test_flush_and_read(self):
1681 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1682
1683 def test_flush_and_readinto(self):
1684 def _readinto(bufio, n=-1):
1685 b = bytearray(n if n >= 0 else 9999)
1686 n = bufio.readinto(b)
1687 return bytes(b[:n])
1688 self.check_flush_and_read(_readinto)
1689
1690 def test_flush_and_peek(self):
1691 def _peek(bufio, n=-1):
1692 # This relies on the fact that the buffer can contain the whole
1693 # raw stream, otherwise peek() can return less.
1694 b = bufio.peek(n)
1695 if n != -1:
1696 b = b[:n]
1697 bufio.seek(len(b), 1)
1698 return b
1699 self.check_flush_and_read(_peek)
1700
1701 def test_flush_and_write(self):
1702 raw = self.BytesIO(b"abcdefghi")
1703 bufio = self.tp(raw)
1704
1705 bufio.write(b"123")
1706 bufio.flush()
1707 bufio.write(b"45")
1708 bufio.flush()
1709 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001710 self.assertEqual(b"12345fghi", raw.getvalue())
1711 self.assertEqual(b"12345fghi", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +00001712
1713 def test_threads(self):
1714 BufferedReaderTest.test_threads(self)
1715 BufferedWriterTest.test_threads(self)
1716
1717 def test_writes_and_peek(self):
1718 def _peek(bufio):
1719 bufio.peek(1)
1720 self.check_writes(_peek)
1721 def _peek(bufio):
1722 pos = bufio.tell()
1723 bufio.seek(-1, 1)
1724 bufio.peek(1)
1725 bufio.seek(pos, 0)
1726 self.check_writes(_peek)
1727
1728 def test_writes_and_reads(self):
1729 def _read(bufio):
1730 bufio.seek(-1, 1)
1731 bufio.read(1)
1732 self.check_writes(_read)
1733
1734 def test_writes_and_read1s(self):
1735 def _read1(bufio):
1736 bufio.seek(-1, 1)
1737 bufio.read1(1)
1738 self.check_writes(_read1)
1739
1740 def test_writes_and_readintos(self):
1741 def _read(bufio):
1742 bufio.seek(-1, 1)
1743 bufio.readinto(bytearray(1))
1744 self.check_writes(_read)
1745
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001746 def test_write_after_readahead(self):
1747 # Issue #6629: writing after the buffer was filled by readahead should
1748 # first rewind the raw stream.
1749 for overwrite_size in [1, 5]:
1750 raw = self.BytesIO(b"A" * 10)
1751 bufio = self.tp(raw, 4)
1752 # Trigger readahead
1753 self.assertEqual(bufio.read(1), b"A")
1754 self.assertEqual(bufio.tell(), 1)
1755 # Overwriting should rewind the raw stream if it needs so
1756 bufio.write(b"B" * overwrite_size)
1757 self.assertEqual(bufio.tell(), overwrite_size + 1)
1758 # If the write size was smaller than the buffer size, flush() and
1759 # check that rewind happens.
1760 bufio.flush()
1761 self.assertEqual(bufio.tell(), overwrite_size + 1)
1762 s = raw.getvalue()
1763 self.assertEqual(s,
1764 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1765
Antoine Pitrouee46a7b2011-05-13 00:31:52 +02001766 def test_write_rewind_write(self):
1767 # Various combinations of reading / writing / seeking backwards / writing again
1768 def mutate(bufio, pos1, pos2):
1769 assert pos2 >= pos1
1770 # Fill the buffer
1771 bufio.seek(pos1)
1772 bufio.read(pos2 - pos1)
1773 bufio.write(b'\x02')
1774 # This writes earlier than the previous write, but still inside
1775 # the buffer.
1776 bufio.seek(pos1)
1777 bufio.write(b'\x01')
1778
1779 b = b"\x80\x81\x82\x83\x84"
1780 for i in range(0, len(b)):
1781 for j in range(i, len(b)):
1782 raw = self.BytesIO(b)
1783 bufio = self.tp(raw, 100)
1784 mutate(bufio, i, j)
1785 bufio.flush()
1786 expected = bytearray(b)
1787 expected[j] = 2
1788 expected[i] = 1
1789 self.assertEqual(raw.getvalue(), expected,
1790 "failed result for i=%d, j=%d" % (i, j))
1791
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001792 def test_truncate_after_read_or_write(self):
1793 raw = self.BytesIO(b"A" * 10)
1794 bufio = self.tp(raw, 100)
1795 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1796 self.assertEqual(bufio.truncate(), 2)
1797 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1798 self.assertEqual(bufio.truncate(), 4)
1799
Antoine Pitrou19690592009-06-12 20:14:08 +00001800 def test_misbehaved_io(self):
1801 BufferedReaderTest.test_misbehaved_io(self)
1802 BufferedWriterTest.test_misbehaved_io(self)
1803
Antoine Pitrou808cec52011-08-20 15:40:58 +02001804 def test_interleaved_read_write(self):
1805 # Test for issue #12213
1806 with self.BytesIO(b'abcdefgh') as raw:
1807 with self.tp(raw, 100) as f:
1808 f.write(b"1")
1809 self.assertEqual(f.read(1), b'b')
1810 f.write(b'2')
1811 self.assertEqual(f.read1(1), b'd')
1812 f.write(b'3')
1813 buf = bytearray(1)
1814 f.readinto(buf)
1815 self.assertEqual(buf, b'f')
1816 f.write(b'4')
1817 self.assertEqual(f.peek(1), b'h')
1818 f.flush()
1819 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1820
1821 with self.BytesIO(b'abc') as raw:
1822 with self.tp(raw, 100) as f:
1823 self.assertEqual(f.read(1), b'a')
1824 f.write(b"2")
1825 self.assertEqual(f.read(1), b'c')
1826 f.flush()
1827 self.assertEqual(raw.getvalue(), b'a2c')
1828
1829 def test_interleaved_readline_write(self):
1830 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1831 with self.tp(raw) as f:
1832 f.write(b'1')
1833 self.assertEqual(f.readline(), b'b\n')
1834 f.write(b'2')
1835 self.assertEqual(f.readline(), b'def\n')
1836 f.write(b'3')
1837 self.assertEqual(f.readline(), b'\n')
1838 f.flush()
1839 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1840
R David Murray5b2cf5e2013-02-23 22:11:21 -05001841
Antoine Pitroubff5df02012-07-29 19:02:46 +02001842class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
1843 BufferedRandomTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001844 tp = io.BufferedRandom
1845
1846 def test_constructor(self):
1847 BufferedRandomTest.test_constructor(self)
1848 # The allocation can succeed on 32-bit builds, e.g. with more
1849 # than 2GB RAM and a 64-bit kernel.
1850 if sys.maxsize > 0x7FFFFFFF:
1851 rawio = self.MockRawIO()
1852 bufio = self.tp(rawio)
1853 self.assertRaises((OverflowError, MemoryError, ValueError),
1854 bufio.__init__, rawio, sys.maxsize)
1855
1856 def test_garbage_collection(self):
1857 CBufferedReaderTest.test_garbage_collection(self)
1858 CBufferedWriterTest.test_garbage_collection(self)
1859
R David Murray5b2cf5e2013-02-23 22:11:21 -05001860 def test_args_error(self):
1861 # Issue #17275
1862 with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
1863 self.tp(io.BytesIO(), 1024, 1024, 1024)
1864
1865
Antoine Pitrou19690592009-06-12 20:14:08 +00001866class PyBufferedRandomTest(BufferedRandomTest):
1867 tp = pyio.BufferedRandom
1868
1869
Christian Heimes1a6387e2008-03-26 12:49:49 +00001870# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1871# properties:
1872# - A single output character can correspond to many bytes of input.
1873# - The number of input bytes to complete the character can be
1874# undetermined until the last input byte is received.
1875# - The number of input bytes can vary depending on previous input.
1876# - A single input byte can correspond to many characters of output.
1877# - The number of output characters can be undetermined until the
1878# last input byte is received.
1879# - The number of output characters can vary depending on previous input.
1880
1881class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1882 """
1883 For testing seek/tell behavior with a stateful, buffering decoder.
1884
1885 Input is a sequence of words. Words may be fixed-length (length set
1886 by input) or variable-length (period-terminated). In variable-length
1887 mode, extra periods are ignored. Possible words are:
1888 - 'i' followed by a number sets the input length, I (maximum 99).
1889 When I is set to 0, words are space-terminated.
1890 - 'o' followed by a number sets the output length, O (maximum 99).
1891 - Any other word is converted into a word followed by a period on
1892 the output. The output word consists of the input word truncated
1893 or padded out with hyphens to make its length equal to O. If O
1894 is 0, the word is output verbatim without truncating or padding.
1895 I and O are initially set to 1. When I changes, any buffered input is
1896 re-scanned according to the new I. EOF also terminates the last word.
1897 """
1898
1899 def __init__(self, errors='strict'):
1900 codecs.IncrementalDecoder.__init__(self, errors)
1901 self.reset()
1902
1903 def __repr__(self):
1904 return '<SID %x>' % id(self)
1905
1906 def reset(self):
1907 self.i = 1
1908 self.o = 1
1909 self.buffer = bytearray()
1910
1911 def getstate(self):
1912 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1913 return bytes(self.buffer), i*100 + o
1914
1915 def setstate(self, state):
1916 buffer, io = state
1917 self.buffer = bytearray(buffer)
1918 i, o = divmod(io, 100)
1919 self.i, self.o = i ^ 1, o ^ 1
1920
1921 def decode(self, input, final=False):
1922 output = ''
1923 for b in input:
1924 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001925 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001926 if self.buffer:
1927 output += self.process_word()
1928 else:
1929 self.buffer.append(b)
1930 else: # fixed-length, terminate after self.i bytes
1931 self.buffer.append(b)
1932 if len(self.buffer) == self.i:
1933 output += self.process_word()
1934 if final and self.buffer: # EOF terminates the last word
1935 output += self.process_word()
1936 return output
1937
1938 def process_word(self):
1939 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001940 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001941 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001942 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001943 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1944 else:
1945 output = self.buffer.decode('ascii')
1946 if len(output) < self.o:
1947 output += '-'*self.o # pad out with hyphens
1948 if self.o:
1949 output = output[:self.o] # truncate to output length
1950 output += '.'
1951 self.buffer = bytearray()
1952 return output
1953
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001954 codecEnabled = False
1955
1956 @classmethod
1957 def lookupTestDecoder(cls, name):
1958 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001959 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001960 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001961 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001962 incrementalencoder=None,
1963 streamreader=None, streamwriter=None,
1964 incrementaldecoder=cls)
1965
1966# Register the previous decoder for testing.
1967# Disabled by default, tests will enable it.
1968codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1969
1970
Christian Heimes1a6387e2008-03-26 12:49:49 +00001971class StatefulIncrementalDecoderTest(unittest.TestCase):
1972 """
1973 Make sure the StatefulIncrementalDecoder actually works.
1974 """
1975
1976 test_cases = [
1977 # I=1, O=1 (fixed-length input == fixed-length output)
1978 (b'abcd', False, 'a.b.c.d.'),
1979 # I=0, O=0 (variable-length input, variable-length output)
1980 (b'oiabcd', True, 'abcd.'),
1981 # I=0, O=0 (should ignore extra periods)
1982 (b'oi...abcd...', True, 'abcd.'),
1983 # I=0, O=6 (variable-length input, fixed-length output)
1984 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1985 # I=2, O=6 (fixed-length input < fixed-length output)
1986 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1987 # I=6, O=3 (fixed-length input > fixed-length output)
1988 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1989 # I=0, then 3; O=29, then 15 (with longer output)
1990 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1991 'a----------------------------.' +
1992 'b----------------------------.' +
1993 'cde--------------------------.' +
1994 'abcdefghijabcde.' +
1995 'a.b------------.' +
1996 '.c.------------.' +
1997 'd.e------------.' +
1998 'k--------------.' +
1999 'l--------------.' +
2000 'm--------------.')
2001 ]
2002
Antoine Pitrou19690592009-06-12 20:14:08 +00002003 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002004 # Try a few one-shot test cases.
2005 for input, eof, output in self.test_cases:
2006 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00002007 self.assertEqual(d.decode(input, eof), output)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002008
2009 # Also test an unfinished decode, followed by forcing EOF.
2010 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00002011 self.assertEqual(d.decode(b'oiabcd'), '')
2012 self.assertEqual(d.decode(b'', 1), 'abcd.')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002013
2014class TextIOWrapperTest(unittest.TestCase):
2015
2016 def setUp(self):
2017 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2018 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00002019 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002020
2021 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002022 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002023
Antoine Pitrou19690592009-06-12 20:14:08 +00002024 def test_constructor(self):
2025 r = self.BytesIO(b"\xc3\xa9\n\n")
2026 b = self.BufferedReader(r, 1000)
2027 t = self.TextIOWrapper(b)
2028 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti2623a372010-11-21 13:34:58 +00002029 self.assertEqual(t.encoding, "latin1")
2030 self.assertEqual(t.line_buffering, False)
Antoine Pitrou19690592009-06-12 20:14:08 +00002031 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti2623a372010-11-21 13:34:58 +00002032 self.assertEqual(t.encoding, "utf8")
2033 self.assertEqual(t.line_buffering, True)
2034 self.assertEqual("\xe9\n", t.readline())
Antoine Pitrou19690592009-06-12 20:14:08 +00002035 self.assertRaises(TypeError, t.__init__, b, newline=42)
2036 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2037
Serhiy Storchakaabb7e652015-04-16 11:56:35 +03002038 def test_uninitialized(self):
2039 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2040 del t
2041 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2042 self.assertRaises(Exception, repr, t)
2043 self.assertRaisesRegexp((ValueError, AttributeError),
2044 'uninitialized|has no attribute',
2045 t.read, 0)
2046 t.__init__(self.MockRawIO())
2047 self.assertEqual(t.read(0), u'')
2048
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002049 def test_non_text_encoding_codecs_are_rejected(self):
2050 # Ensure the constructor complains if passed a codec that isn't
2051 # marked as a text encoding
2052 # http://bugs.python.org/issue20404
2053 r = self.BytesIO()
2054 b = self.BufferedWriter(r)
2055 with support.check_py3k_warnings():
2056 self.TextIOWrapper(b, encoding="hex_codec")
2057
Antoine Pitrou19690592009-06-12 20:14:08 +00002058 def test_detach(self):
2059 r = self.BytesIO()
2060 b = self.BufferedWriter(r)
2061 t = self.TextIOWrapper(b)
2062 self.assertIs(t.detach(), b)
2063
2064 t = self.TextIOWrapper(b, encoding="ascii")
2065 t.write("howdy")
2066 self.assertFalse(r.getvalue())
2067 t.detach()
2068 self.assertEqual(r.getvalue(), b"howdy")
2069 self.assertRaises(ValueError, t.detach)
2070
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002071 # Operations independent of the detached stream should still work
2072 repr(t)
2073 self.assertEqual(t.encoding, "ascii")
2074 self.assertEqual(t.errors, "strict")
2075 self.assertFalse(t.line_buffering)
2076
Antoine Pitrou19690592009-06-12 20:14:08 +00002077 def test_repr(self):
2078 raw = self.BytesIO("hello".encode("utf-8"))
2079 b = self.BufferedReader(raw)
2080 t = self.TextIOWrapper(b, encoding="utf-8")
2081 modname = self.TextIOWrapper.__module__
2082 self.assertEqual(repr(t),
2083 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2084 raw.name = "dummy"
2085 self.assertEqual(repr(t),
2086 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
2087 raw.name = b"dummy"
2088 self.assertEqual(repr(t),
2089 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
2090
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002091 t.buffer.detach()
2092 repr(t) # Should not raise an exception
2093
Antoine Pitrou19690592009-06-12 20:14:08 +00002094 def test_line_buffering(self):
2095 r = self.BytesIO()
2096 b = self.BufferedWriter(r, 1000)
2097 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
2098 t.write("X")
Ezio Melotti2623a372010-11-21 13:34:58 +00002099 self.assertEqual(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00002100 t.write("Y\nZ")
Ezio Melotti2623a372010-11-21 13:34:58 +00002101 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00002102 t.write("A\rB")
Ezio Melotti2623a372010-11-21 13:34:58 +00002103 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002104
Antoine Pitrou19690592009-06-12 20:14:08 +00002105 def test_encoding(self):
2106 # Check the encoding attribute is always set, and valid
2107 b = self.BytesIO()
2108 t = self.TextIOWrapper(b, encoding="utf8")
2109 self.assertEqual(t.encoding, "utf8")
2110 t = self.TextIOWrapper(b)
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03002111 self.assertIsNotNone(t.encoding)
Antoine Pitrou19690592009-06-12 20:14:08 +00002112 codecs.lookup(t.encoding)
2113
2114 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002115 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00002116 b = self.BytesIO(b"abc\n\xff\n")
2117 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002118 self.assertRaises(UnicodeError, t.read)
2119 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00002120 b = self.BytesIO(b"abc\n\xff\n")
2121 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002122 self.assertRaises(UnicodeError, t.read)
2123 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00002124 b = self.BytesIO(b"abc\n\xff\n")
2125 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti2623a372010-11-21 13:34:58 +00002126 self.assertEqual(t.read(), "abc\n\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002127 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00002128 b = self.BytesIO(b"abc\n\xff\n")
2129 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti2623a372010-11-21 13:34:58 +00002130 self.assertEqual(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002131
Antoine Pitrou19690592009-06-12 20:14:08 +00002132 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002133 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00002134 b = self.BytesIO()
2135 t = self.TextIOWrapper(b, encoding="ascii")
2136 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002137 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00002138 b = self.BytesIO()
2139 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2140 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002141 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00002142 b = self.BytesIO()
2143 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00002144 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00002145 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002146 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002147 self.assertEqual(b.getvalue(), b"abcdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002148 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00002149 b = self.BytesIO()
2150 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00002151 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00002152 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002153 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002154 self.assertEqual(b.getvalue(), b"abc?def\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002155
Antoine Pitrou19690592009-06-12 20:14:08 +00002156 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002157 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2158
2159 tests = [
2160 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
2161 [ '', input_lines ],
2162 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2163 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2164 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
2165 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002166 encodings = (
2167 'utf-8', 'latin-1',
2168 'utf-16', 'utf-16-le', 'utf-16-be',
2169 'utf-32', 'utf-32-le', 'utf-32-be',
2170 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00002171
2172 # Try a range of buffer sizes to test the case where \r is the last
2173 # character in TextIOWrapper._pending_line.
2174 for encoding in encodings:
2175 # XXX: str.encode() should return bytes
2176 data = bytes(''.join(input_lines).encode(encoding))
2177 for do_reads in (False, True):
2178 for bufsize in range(1, 10):
2179 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002180 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2181 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00002182 encoding=encoding)
2183 if do_reads:
2184 got_lines = []
2185 while True:
2186 c2 = textio.read(2)
2187 if c2 == '':
2188 break
Ezio Melotti2623a372010-11-21 13:34:58 +00002189 self.assertEqual(len(c2), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002190 got_lines.append(c2 + textio.readline())
2191 else:
2192 got_lines = list(textio)
2193
2194 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti2623a372010-11-21 13:34:58 +00002195 self.assertEqual(got_line, exp_line)
2196 self.assertEqual(len(got_lines), len(exp_lines))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002197
Antoine Pitrou19690592009-06-12 20:14:08 +00002198 def test_newlines_input(self):
2199 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002200 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2201 for newline, expected in [
2202 (None, normalized.decode("ascii").splitlines(True)),
2203 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00002204 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2205 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2206 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00002207 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00002208 buf = self.BytesIO(testdata)
2209 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti2623a372010-11-21 13:34:58 +00002210 self.assertEqual(txt.readlines(), expected)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002211 txt.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002212 self.assertEqual(txt.read(), "".join(expected))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002213
Antoine Pitrou19690592009-06-12 20:14:08 +00002214 def test_newlines_output(self):
2215 testdict = {
2216 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2217 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2218 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2219 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2220 }
2221 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2222 for newline, expected in tests:
2223 buf = self.BytesIO()
2224 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2225 txt.write("AAA\nB")
2226 txt.write("BB\nCCC\n")
2227 txt.write("X\rY\r\nZ")
2228 txt.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002229 self.assertEqual(buf.closed, False)
2230 self.assertEqual(buf.getvalue(), expected)
Antoine Pitrou19690592009-06-12 20:14:08 +00002231
2232 def test_destructor(self):
2233 l = []
2234 base = self.BytesIO
2235 class MyBytesIO(base):
2236 def close(self):
2237 l.append(self.getvalue())
2238 base.close(self)
2239 b = MyBytesIO()
2240 t = self.TextIOWrapper(b, encoding="ascii")
2241 t.write("abc")
2242 del t
2243 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00002244 self.assertEqual([b"abc"], l)
Antoine Pitrou19690592009-06-12 20:14:08 +00002245
2246 def test_override_destructor(self):
2247 record = []
2248 class MyTextIO(self.TextIOWrapper):
2249 def __del__(self):
2250 record.append(1)
2251 try:
2252 f = super(MyTextIO, self).__del__
2253 except AttributeError:
2254 pass
2255 else:
2256 f()
2257 def close(self):
2258 record.append(2)
2259 super(MyTextIO, self).close()
2260 def flush(self):
2261 record.append(3)
2262 super(MyTextIO, self).flush()
2263 b = self.BytesIO()
2264 t = MyTextIO(b, encoding="ascii")
2265 del t
2266 support.gc_collect()
2267 self.assertEqual(record, [1, 2, 3])
2268
2269 def test_error_through_destructor(self):
2270 # Test that the exception state is not modified by a destructor,
2271 # even if close() fails.
2272 rawio = self.CloseFailureIO()
2273 def f():
2274 self.TextIOWrapper(rawio).xyzzy
2275 with support.captured_output("stderr") as s:
2276 self.assertRaises(AttributeError, f)
2277 s = s.getvalue().strip()
2278 if s:
2279 # The destructor *may* have printed an unraisable error, check it
2280 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002281 self.assertTrue(s.startswith("Exception IOError: "), s)
2282 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002283
2284 # Systematic tests of the text I/O API
2285
Antoine Pitrou19690592009-06-12 20:14:08 +00002286 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002287 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2288 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00002289 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002290 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002291 self.assertEqual(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002292 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002293 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002294 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002295 self.assertEqual(f.tell(), 0)
2296 self.assertEqual(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002297 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002298 self.assertEqual(f.seek(0), 0)
2299 self.assertEqual(f.read(None), "abc")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002300 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002301 self.assertEqual(f.read(2), "ab")
2302 self.assertEqual(f.read(1), "c")
2303 self.assertEqual(f.read(1), "")
2304 self.assertEqual(f.read(), "")
2305 self.assertEqual(f.tell(), cookie)
2306 self.assertEqual(f.seek(0), 0)
2307 self.assertEqual(f.seek(0, 2), cookie)
2308 self.assertEqual(f.write("def"), 3)
2309 self.assertEqual(f.seek(cookie), cookie)
2310 self.assertEqual(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002311 if enc.startswith("utf"):
2312 self.multi_line_test(f, enc)
2313 f.close()
2314
2315 def multi_line_test(self, f, enc):
2316 f.seek(0)
2317 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00002318 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002319 wlines = []
2320 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2321 chars = []
2322 for i in range(size):
2323 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00002324 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002325 wlines.append((f.tell(), line))
2326 f.write(line)
2327 f.seek(0)
2328 rlines = []
2329 while True:
2330 pos = f.tell()
2331 line = f.readline()
2332 if not line:
2333 break
2334 rlines.append((pos, line))
Ezio Melotti2623a372010-11-21 13:34:58 +00002335 self.assertEqual(rlines, wlines)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002336
Antoine Pitrou19690592009-06-12 20:14:08 +00002337 def test_telling(self):
2338 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002339 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002340 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002341 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002342 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002343 p2 = f.tell()
2344 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002345 self.assertEqual(f.tell(), p0)
2346 self.assertEqual(f.readline(), "\xff\n")
2347 self.assertEqual(f.tell(), p1)
2348 self.assertEqual(f.readline(), "\xff\n")
2349 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002350 f.seek(0)
2351 for line in f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002352 self.assertEqual(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002353 self.assertRaises(IOError, f.tell)
Ezio Melotti2623a372010-11-21 13:34:58 +00002354 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002355 f.close()
2356
Antoine Pitrou19690592009-06-12 20:14:08 +00002357 def test_seeking(self):
2358 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00002359 prefix_size = chunk_size - 2
2360 u_prefix = "a" * prefix_size
2361 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti2623a372010-11-21 13:34:58 +00002362 self.assertEqual(len(u_prefix), len(prefix))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002363 u_suffix = "\u8888\n"
2364 suffix = bytes(u_suffix.encode("utf-8"))
2365 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00002366 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002367 f.write(line*2)
2368 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002369 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002370 s = f.read(prefix_size)
Ezio Melotti2623a372010-11-21 13:34:58 +00002371 self.assertEqual(s, prefix.decode("ascii"))
2372 self.assertEqual(f.tell(), prefix_size)
2373 self.assertEqual(f.readline(), u_suffix)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002374
Antoine Pitrou19690592009-06-12 20:14:08 +00002375 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002376 # Regression test for a specific bug
2377 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00002378 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002379 f.write(data)
2380 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002381 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002382 f._CHUNK_SIZE # Just test that it exists
2383 f._CHUNK_SIZE = 2
2384 f.readline()
2385 f.tell()
2386
Antoine Pitrou19690592009-06-12 20:14:08 +00002387 def test_seek_and_tell(self):
2388 #Test seek/tell using the StatefulIncrementalDecoder.
2389 # Make test faster by doing smaller seeks
2390 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00002391
Antoine Pitrou19690592009-06-12 20:14:08 +00002392 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002393 """Tell/seek to various points within a data stream and ensure
2394 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00002395 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002396 f.write(data)
2397 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002398 f = self.open(support.TESTFN, encoding='test_decoder')
2399 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00002400 decoded = f.read()
2401 f.close()
2402
2403 for i in range(min_pos, len(decoded) + 1): # seek positions
2404 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00002405 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti2623a372010-11-21 13:34:58 +00002406 self.assertEqual(f.read(i), decoded[:i])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002407 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002408 self.assertEqual(f.read(j), decoded[i:i + j])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002409 f.seek(cookie)
Ezio Melotti2623a372010-11-21 13:34:58 +00002410 self.assertEqual(f.read(), decoded[i:])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002411 f.close()
2412
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002413 # Enable the test decoder.
2414 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002415
2416 # Run the tests.
2417 try:
2418 # Try each test case.
2419 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002420 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002421
2422 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002423 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2424 offset = CHUNK_SIZE - len(input)//2
2425 prefix = b'.'*offset
2426 # Don't bother seeking into the prefix (takes too long).
2427 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002428 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002429
2430 # Ensure our test decoder won't interfere with subsequent tests.
2431 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002432 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002433
Antoine Pitrou19690592009-06-12 20:14:08 +00002434 def test_encoded_writes(self):
2435 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002436 tests = ("utf-16",
2437 "utf-16-le",
2438 "utf-16-be",
2439 "utf-32",
2440 "utf-32-le",
2441 "utf-32-be")
2442 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002443 buf = self.BytesIO()
2444 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002445 # Check if the BOM is written only once (see issue1753).
2446 f.write(data)
2447 f.write(data)
2448 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002449 self.assertEqual(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002450 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002451 self.assertEqual(f.read(), data * 2)
2452 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002453
Antoine Pitrou19690592009-06-12 20:14:08 +00002454 def test_unreadable(self):
2455 class UnReadable(self.BytesIO):
2456 def readable(self):
2457 return False
2458 txt = self.TextIOWrapper(UnReadable())
2459 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002460
Antoine Pitrou19690592009-06-12 20:14:08 +00002461 def test_read_one_by_one(self):
2462 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002463 reads = ""
2464 while True:
2465 c = txt.read(1)
2466 if not c:
2467 break
2468 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002469 self.assertEqual(reads, "AA\nBB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002470
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002471 def test_readlines(self):
2472 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2473 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2474 txt.seek(0)
2475 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2476 txt.seek(0)
2477 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2478
Christian Heimes1a6387e2008-03-26 12:49:49 +00002479 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002480 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002481 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002482 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002483 reads = ""
2484 while True:
2485 c = txt.read(128)
2486 if not c:
2487 break
2488 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002489 self.assertEqual(reads, "A"*127+"\nB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002490
Antoine Pitroueadca1d2012-10-16 23:02:27 +02002491 def test_writelines(self):
2492 l = ['ab', 'cd', 'ef']
2493 buf = self.BytesIO()
2494 txt = self.TextIOWrapper(buf)
2495 txt.writelines(l)
2496 txt.flush()
2497 self.assertEqual(buf.getvalue(), b'abcdef')
2498
2499 def test_writelines_userlist(self):
2500 l = UserList(['ab', 'cd', 'ef'])
2501 buf = self.BytesIO()
2502 txt = self.TextIOWrapper(buf)
2503 txt.writelines(l)
2504 txt.flush()
2505 self.assertEqual(buf.getvalue(), b'abcdef')
2506
2507 def test_writelines_error(self):
2508 txt = self.TextIOWrapper(self.BytesIO())
2509 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2510 self.assertRaises(TypeError, txt.writelines, None)
2511 self.assertRaises(TypeError, txt.writelines, b'abc')
2512
Christian Heimes1a6387e2008-03-26 12:49:49 +00002513 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002514 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002515
2516 # read one char at a time
2517 reads = ""
2518 while True:
2519 c = txt.read(1)
2520 if not c:
2521 break
2522 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002523 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002524
2525 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002526 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002527 txt._CHUNK_SIZE = 4
2528
2529 reads = ""
2530 while True:
2531 c = txt.read(4)
2532 if not c:
2533 break
2534 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002535 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002536
2537 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002538 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002539 txt._CHUNK_SIZE = 4
2540
2541 reads = txt.read(4)
2542 reads += txt.read(4)
2543 reads += txt.readline()
2544 reads += txt.readline()
2545 reads += txt.readline()
Ezio Melotti2623a372010-11-21 13:34:58 +00002546 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002547
2548 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002549 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002550 txt._CHUNK_SIZE = 4
2551
2552 reads = txt.read(4)
2553 reads += txt.read()
Ezio Melotti2623a372010-11-21 13:34:58 +00002554 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002555
2556 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002557 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002558 txt._CHUNK_SIZE = 4
2559
2560 reads = txt.read(4)
2561 pos = txt.tell()
2562 txt.seek(0)
2563 txt.seek(pos)
Ezio Melotti2623a372010-11-21 13:34:58 +00002564 self.assertEqual(txt.read(4), "BBB\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002565
2566 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002567 buffer = self.BytesIO(self.testdata)
2568 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002569
2570 self.assertEqual(buffer.seekable(), txt.seekable())
2571
Antoine Pitrou19690592009-06-12 20:14:08 +00002572 def test_append_bom(self):
2573 # The BOM is not written again when appending to a non-empty file
2574 filename = support.TESTFN
2575 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2576 with self.open(filename, 'w', encoding=charset) as f:
2577 f.write('aaa')
2578 pos = f.tell()
2579 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002580 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002581
2582 with self.open(filename, 'a', encoding=charset) as f:
2583 f.write('xxx')
2584 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002585 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002586
Antoine Pitrou19690592009-06-12 20:14:08 +00002587 def test_seek_bom(self):
2588 # Same test, but when seeking manually
2589 filename = support.TESTFN
2590 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2591 with self.open(filename, 'w', encoding=charset) as f:
2592 f.write('aaa')
2593 pos = f.tell()
2594 with self.open(filename, 'r+', encoding=charset) as f:
2595 f.seek(pos)
2596 f.write('zzz')
2597 f.seek(0)
2598 f.write('bbb')
2599 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002600 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002601
2602 def test_errors_property(self):
2603 with self.open(support.TESTFN, "w") as f:
2604 self.assertEqual(f.errors, "strict")
2605 with self.open(support.TESTFN, "w", errors="replace") as f:
2606 self.assertEqual(f.errors, "replace")
2607
Victor Stinner6a102812010-04-27 23:55:59 +00002608 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002609 def test_threads_write(self):
2610 # Issue6750: concurrent writes could duplicate data
2611 event = threading.Event()
2612 with self.open(support.TESTFN, "w", buffering=1) as f:
2613 def run(n):
2614 text = "Thread%03d\n" % n
2615 event.wait()
2616 f.write(text)
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03002617 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002618 for x in range(20)]
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03002619 with support.start_threads(threads, event.set):
2620 time.sleep(0.02)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002621 with self.open(support.TESTFN) as f:
2622 content = f.read()
2623 for n in range(20):
Ezio Melotti2623a372010-11-21 13:34:58 +00002624 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002625
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002626 def test_flush_error_on_close(self):
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002627 # Test that text file is closed despite failed flush
2628 # and that flush() is called before file closed.
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002629 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002630 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002631 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002632 closed[:] = [txt.closed, txt.buffer.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002633 raise IOError()
2634 txt.flush = bad_flush
2635 self.assertRaises(IOError, txt.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -06002636 self.assertTrue(txt.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002637 self.assertTrue(txt.buffer.closed)
2638 self.assertTrue(closed) # flush() called
2639 self.assertFalse(closed[0]) # flush() called before file closed
2640 self.assertFalse(closed[1])
Serhiy Storchaka437d5352015-02-23 00:28:38 +02002641 txt.flush = lambda: None # break reference loop
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002642
2643 def test_multi_close(self):
2644 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2645 txt.close()
2646 txt.close()
2647 txt.close()
2648 self.assertRaises(ValueError, txt.flush)
2649
Antoine Pitroufc9ead62010-12-21 21:26:55 +00002650 def test_readonly_attributes(self):
2651 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2652 buf = self.BytesIO(self.testdata)
2653 with self.assertRaises((AttributeError, TypeError)):
2654 txt.buffer = buf
2655
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002656 def test_read_nonbytes(self):
2657 # Issue #17106
2658 # Crash when underlying read() returns non-bytes
2659 class NonbytesStream(self.StringIO):
2660 read1 = self.StringIO.read
2661 class NonbytesStream(self.StringIO):
2662 read1 = self.StringIO.read
2663 t = self.TextIOWrapper(NonbytesStream('a'))
2664 with self.maybeRaises(TypeError):
2665 t.read(1)
2666 t = self.TextIOWrapper(NonbytesStream('a'))
2667 with self.maybeRaises(TypeError):
2668 t.readline()
2669 t = self.TextIOWrapper(NonbytesStream('a'))
2670 self.assertEqual(t.read(), u'a')
2671
Oren Milman30537692017-11-07 02:17:54 +02002672 def test_illegal_encoder(self):
2673 # bpo-31271: A TypeError should be raised in case the return value of
2674 # encoder's encode() is invalid.
2675 class BadEncoder:
2676 def encode(self, dummy):
2677 return u'spam'
2678 def get_bad_encoder(dummy):
2679 return BadEncoder()
2680 rot13 = codecs.lookup("rot13")
2681 with support.swap_attr(rot13, '_is_text_encoding', True), \
2682 support.swap_attr(rot13, 'incrementalencoder', get_bad_encoder):
2683 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
2684 with self.assertRaises(TypeError):
2685 t.write('bar')
2686 t.flush()
2687
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002688 def test_illegal_decoder(self):
2689 # Issue #17106
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002690 # Bypass the early encoding check added in issue 20404
2691 def _make_illegal_wrapper():
2692 quopri = codecs.lookup("quopri_codec")
2693 quopri._is_text_encoding = True
2694 try:
2695 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
2696 newline='\n', encoding="quopri_codec")
2697 finally:
2698 quopri._is_text_encoding = False
2699 return t
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002700 # Crash when decoder returns non-string
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002701 with support.check_py3k_warnings():
2702 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2703 encoding='quopri_codec')
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002704 with self.maybeRaises(TypeError):
2705 t.read(1)
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002706 with support.check_py3k_warnings():
2707 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2708 encoding='quopri_codec')
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002709 with self.maybeRaises(TypeError):
2710 t.readline()
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002711 with support.check_py3k_warnings():
2712 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2713 encoding='quopri_codec')
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002714 with self.maybeRaises(TypeError):
2715 t.read()
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002716 #else:
2717 #t = _make_illegal_wrapper()
2718 #self.assertRaises(TypeError, t.read, 1)
2719 #t = _make_illegal_wrapper()
2720 #self.assertRaises(TypeError, t.readline)
2721 #t = _make_illegal_wrapper()
2722 #self.assertRaises(TypeError, t.read)
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002723
Oren Milman20958e62017-08-29 19:16:12 +03002724 # Issue 31243: calling read() while the return value of decoder's
2725 # getstate() is invalid should neither crash the interpreter nor
2726 # raise a SystemError.
2727 def _make_very_illegal_wrapper(getstate_ret_val):
2728 class BadDecoder:
2729 def getstate(self):
2730 return getstate_ret_val
2731 def _get_bad_decoder(dummy):
2732 return BadDecoder()
2733 quopri = codecs.lookup("quopri_codec")
2734 with support.swap_attr(quopri, 'incrementaldecoder',
2735 _get_bad_decoder):
2736 return _make_illegal_wrapper()
2737 t = _make_very_illegal_wrapper(42)
2738 with self.maybeRaises(TypeError):
2739 t.read(42)
2740 t = _make_very_illegal_wrapper(())
2741 with self.maybeRaises(TypeError):
2742 t.read(42)
2743
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002744
Antoine Pitrou19690592009-06-12 20:14:08 +00002745class CTextIOWrapperTest(TextIOWrapperTest):
2746
2747 def test_initialization(self):
2748 r = self.BytesIO(b"\xc3\xa9\n\n")
2749 b = self.BufferedReader(r, 1000)
2750 t = self.TextIOWrapper(b)
2751 self.assertRaises(TypeError, t.__init__, b, newline=42)
2752 self.assertRaises(ValueError, t.read)
2753 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2754 self.assertRaises(ValueError, t.read)
2755
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002756 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2757 self.assertRaises(Exception, repr, t)
2758
Antoine Pitrou19690592009-06-12 20:14:08 +00002759 def test_garbage_collection(self):
2760 # C TextIOWrapper objects are collected, and collecting them flushes
2761 # all data to disk.
2762 # The Python version has __del__, so it ends in gc.garbage instead.
2763 rawio = io.FileIO(support.TESTFN, "wb")
2764 b = self.BufferedWriter(rawio)
2765 t = self.TextIOWrapper(b, encoding="ascii")
2766 t.write("456def")
2767 t.x = t
2768 wr = weakref.ref(t)
2769 del t
2770 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03002771 self.assertIsNone(wr(), wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002772 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002773 self.assertEqual(f.read(), b"456def")
2774
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002775 def test_rwpair_cleared_before_textio(self):
2776 # Issue 13070: TextIOWrapper's finalization would crash when called
2777 # after the reference to the underlying BufferedRWPair's writer got
2778 # cleared by the GC.
2779 for i in range(1000):
2780 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2781 t1 = self.TextIOWrapper(b1, encoding="ascii")
2782 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2783 t2 = self.TextIOWrapper(b2, encoding="ascii")
2784 # circular references
2785 t1.buddy = t2
2786 t2.buddy = t1
2787 support.gc_collect()
2788
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002789 maybeRaises = unittest.TestCase.assertRaises
2790
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002791
Antoine Pitrou19690592009-06-12 20:14:08 +00002792class PyTextIOWrapperTest(TextIOWrapperTest):
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002793 @contextlib.contextmanager
2794 def maybeRaises(self, *args, **kwds):
2795 yield
Antoine Pitrou19690592009-06-12 20:14:08 +00002796
2797
2798class IncrementalNewlineDecoderTest(unittest.TestCase):
2799
2800 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002801 # UTF-8 specific tests for a newline decoder
2802 def _check_decode(b, s, **kwargs):
2803 # We exercise getstate() / setstate() as well as decode()
2804 state = decoder.getstate()
Ezio Melotti2623a372010-11-21 13:34:58 +00002805 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002806 decoder.setstate(state)
Ezio Melotti2623a372010-11-21 13:34:58 +00002807 self.assertEqual(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002808
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002809 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002810
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002811 _check_decode(b'\xe8', "")
2812 _check_decode(b'\xa2', "")
2813 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002814
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002815 _check_decode(b'\xe8', "")
2816 _check_decode(b'\xa2', "")
2817 _check_decode(b'\x88', "\u8888")
2818
2819 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002820 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2821
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002822 decoder.reset()
2823 _check_decode(b'\n', "\n")
2824 _check_decode(b'\r', "")
2825 _check_decode(b'', "\n", final=True)
2826 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002827
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002828 _check_decode(b'\r', "")
2829 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002830
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002831 _check_decode(b'\r\r\n', "\n\n")
2832 _check_decode(b'\r', "")
2833 _check_decode(b'\r', "\n")
2834 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002835
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002836 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2837 _check_decode(b'\xe8\xa2\x88', "\u8888")
2838 _check_decode(b'\n', "\n")
2839 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2840 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002841
Antoine Pitrou19690592009-06-12 20:14:08 +00002842 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002843 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002844 if encoding is not None:
2845 encoder = codecs.getincrementalencoder(encoding)()
2846 def _decode_bytewise(s):
2847 # Decode one byte at a time
2848 for b in encoder.encode(s):
2849 result.append(decoder.decode(b))
2850 else:
2851 encoder = None
2852 def _decode_bytewise(s):
2853 # Decode one char at a time
2854 for c in s:
2855 result.append(decoder.decode(c))
Ezio Melotti2623a372010-11-21 13:34:58 +00002856 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002857 _decode_bytewise("abc\n\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002858 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002859 _decode_bytewise("\nabc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002860 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002861 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002862 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002863 _decode_bytewise("abc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002864 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002865 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002866 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002867 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002868 input = "abc"
2869 if encoder is not None:
2870 encoder.reset()
2871 input = encoder.encode(input)
Ezio Melotti2623a372010-11-21 13:34:58 +00002872 self.assertEqual(decoder.decode(input), "abc")
2873 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002874
2875 def test_newline_decoder(self):
2876 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002877 # None meaning the IncrementalNewlineDecoder takes unicode input
2878 # rather than bytes input
2879 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002880 'utf-16', 'utf-16-le', 'utf-16-be',
2881 'utf-32', 'utf-32-le', 'utf-32-be',
2882 )
2883 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002884 decoder = enc and codecs.getincrementaldecoder(enc)()
2885 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2886 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002887 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002888 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2889 self.check_newline_decoding_utf8(decoder)
2890
2891 def test_newline_bytes(self):
2892 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2893 def _check(dec):
Ezio Melotti2623a372010-11-21 13:34:58 +00002894 self.assertEqual(dec.newlines, None)
2895 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2896 self.assertEqual(dec.newlines, None)
2897 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2898 self.assertEqual(dec.newlines, None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002899 dec = self.IncrementalNewlineDecoder(None, translate=False)
2900 _check(dec)
2901 dec = self.IncrementalNewlineDecoder(None, translate=True)
2902 _check(dec)
2903
2904class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2905 pass
2906
2907class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2908 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002909
Christian Heimes1a6387e2008-03-26 12:49:49 +00002910
2911# XXX Tests for open()
2912
2913class MiscIOTest(unittest.TestCase):
2914
Benjamin Petersonad100c32008-11-20 22:06:22 +00002915 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002916 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002917
Antoine Pitrou19690592009-06-12 20:14:08 +00002918 def test___all__(self):
2919 for name in self.io.__all__:
2920 obj = getattr(self.io, name, None)
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03002921 self.assertIsNotNone(obj, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002922 if name == "open":
2923 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002924 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002925 self.assertTrue(issubclass(obj, Exception), name)
2926 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002927 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002928
Benjamin Petersonad100c32008-11-20 22:06:22 +00002929 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002930 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002931 self.assertEqual(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002932 f.close()
2933
Antoine Pitrou19690592009-06-12 20:14:08 +00002934 f = self.open(support.TESTFN, "U")
Ezio Melotti2623a372010-11-21 13:34:58 +00002935 self.assertEqual(f.name, support.TESTFN)
2936 self.assertEqual(f.buffer.name, support.TESTFN)
2937 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2938 self.assertEqual(f.mode, "U")
2939 self.assertEqual(f.buffer.mode, "rb")
2940 self.assertEqual(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002941 f.close()
2942
Antoine Pitrou19690592009-06-12 20:14:08 +00002943 f = self.open(support.TESTFN, "w+")
Ezio Melotti2623a372010-11-21 13:34:58 +00002944 self.assertEqual(f.mode, "w+")
2945 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2946 self.assertEqual(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002947
Antoine Pitrou19690592009-06-12 20:14:08 +00002948 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti2623a372010-11-21 13:34:58 +00002949 self.assertEqual(g.mode, "wb")
2950 self.assertEqual(g.raw.mode, "wb")
2951 self.assertEqual(g.name, f.fileno())
2952 self.assertEqual(g.raw.name, f.fileno())
Benjamin Petersonad100c32008-11-20 22:06:22 +00002953 f.close()
2954 g.close()
2955
Antoine Pitrou19690592009-06-12 20:14:08 +00002956 def test_io_after_close(self):
2957 for kwargs in [
2958 {"mode": "w"},
2959 {"mode": "wb"},
2960 {"mode": "w", "buffering": 1},
2961 {"mode": "w", "buffering": 2},
2962 {"mode": "wb", "buffering": 0},
2963 {"mode": "r"},
2964 {"mode": "rb"},
2965 {"mode": "r", "buffering": 1},
2966 {"mode": "r", "buffering": 2},
2967 {"mode": "rb", "buffering": 0},
2968 {"mode": "w+"},
2969 {"mode": "w+b"},
2970 {"mode": "w+", "buffering": 1},
2971 {"mode": "w+", "buffering": 2},
2972 {"mode": "w+b", "buffering": 0},
2973 ]:
2974 f = self.open(support.TESTFN, **kwargs)
2975 f.close()
2976 self.assertRaises(ValueError, f.flush)
2977 self.assertRaises(ValueError, f.fileno)
2978 self.assertRaises(ValueError, f.isatty)
2979 self.assertRaises(ValueError, f.__iter__)
2980 if hasattr(f, "peek"):
2981 self.assertRaises(ValueError, f.peek, 1)
2982 self.assertRaises(ValueError, f.read)
2983 if hasattr(f, "read1"):
2984 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinner5100a402011-05-25 22:15:36 +02002985 if hasattr(f, "readall"):
2986 self.assertRaises(ValueError, f.readall)
Antoine Pitrou19690592009-06-12 20:14:08 +00002987 if hasattr(f, "readinto"):
2988 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2989 self.assertRaises(ValueError, f.readline)
2990 self.assertRaises(ValueError, f.readlines)
Xiang Zhang5fbdfc32017-04-15 13:18:22 +08002991 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou19690592009-06-12 20:14:08 +00002992 self.assertRaises(ValueError, f.seek, 0)
2993 self.assertRaises(ValueError, f.tell)
2994 self.assertRaises(ValueError, f.truncate)
2995 self.assertRaises(ValueError, f.write,
2996 b"" if "b" in kwargs['mode'] else "")
2997 self.assertRaises(ValueError, f.writelines, [])
2998 self.assertRaises(ValueError, next, f)
2999
3000 def test_blockingioerror(self):
3001 # Various BlockingIOError issues
3002 self.assertRaises(TypeError, self.BlockingIOError)
3003 self.assertRaises(TypeError, self.BlockingIOError, 1)
3004 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
3005 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
3006 b = self.BlockingIOError(1, "")
3007 self.assertEqual(b.characters_written, 0)
3008 class C(unicode):
3009 pass
3010 c = C("")
3011 b = self.BlockingIOError(1, c)
3012 c.b = b
3013 b.c = c
3014 wr = weakref.ref(c)
3015 del c, b
3016 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03003017 self.assertIsNone(wr(), wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00003018
3019 def test_abcs(self):
3020 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00003021 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3022 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3023 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3024 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00003025
3026 def _check_abc_inheritance(self, abcmodule):
3027 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00003028 self.assertIsInstance(f, abcmodule.IOBase)
3029 self.assertIsInstance(f, abcmodule.RawIOBase)
3030 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3031 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00003032 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00003033 self.assertIsInstance(f, abcmodule.IOBase)
3034 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3035 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3036 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00003037 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00003038 self.assertIsInstance(f, abcmodule.IOBase)
3039 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3040 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3041 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00003042
3043 def test_abc_inheritance(self):
3044 # Test implementations inherit from their respective ABCs
3045 self._check_abc_inheritance(self)
3046
3047 def test_abc_inheritance_official(self):
3048 # Test implementations inherit from the official ABCs of the
3049 # baseline "io" module.
3050 self._check_abc_inheritance(io)
3051
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01003052 @unittest.skipUnless(fcntl, 'fcntl required for this test')
3053 def test_nonblock_pipe_write_bigbuf(self):
3054 self._test_nonblock_pipe_write(16*1024)
3055
3056 @unittest.skipUnless(fcntl, 'fcntl required for this test')
3057 def test_nonblock_pipe_write_smallbuf(self):
3058 self._test_nonblock_pipe_write(1024)
3059
3060 def _set_non_blocking(self, fd):
3061 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
3062 self.assertNotEqual(flags, -1)
3063 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
3064 self.assertEqual(res, 0)
3065
3066 def _test_nonblock_pipe_write(self, bufsize):
3067 sent = []
3068 received = []
3069 r, w = os.pipe()
3070 self._set_non_blocking(r)
3071 self._set_non_blocking(w)
3072
3073 # To exercise all code paths in the C implementation we need
3074 # to play with buffer sizes. For instance, if we choose a
3075 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3076 # then we will never get a partial write of the buffer.
3077 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3078 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3079
3080 with rf, wf:
3081 for N in 9999, 73, 7574:
3082 try:
3083 i = 0
3084 while True:
3085 msg = bytes([i % 26 + 97]) * N
3086 sent.append(msg)
3087 wf.write(msg)
3088 i += 1
3089
3090 except self.BlockingIOError as e:
3091 self.assertEqual(e.args[0], errno.EAGAIN)
3092 sent[-1] = sent[-1][:e.characters_written]
3093 received.append(rf.read())
3094 msg = b'BLOCKED'
3095 wf.write(msg)
3096 sent.append(msg)
3097
3098 while True:
3099 try:
3100 wf.flush()
3101 break
3102 except self.BlockingIOError as e:
3103 self.assertEqual(e.args[0], errno.EAGAIN)
3104 self.assertEqual(e.characters_written, 0)
3105 received.append(rf.read())
3106
3107 received += iter(rf.read, None)
3108
3109 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03003110 self.assertEqual(sent, received)
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01003111 self.assertTrue(wf.closed)
3112 self.assertTrue(rf.closed)
3113
Antoine Pitrou19690592009-06-12 20:14:08 +00003114class CMiscIOTest(MiscIOTest):
3115 io = io
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03003116 shutdown_error = "RuntimeError: could not find io module state"
Antoine Pitrou19690592009-06-12 20:14:08 +00003117
3118class PyMiscIOTest(MiscIOTest):
3119 io = pyio
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03003120 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Petersonad100c32008-11-20 22:06:22 +00003121
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003122
3123@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3124class SignalsTest(unittest.TestCase):
3125
3126 def setUp(self):
3127 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3128
3129 def tearDown(self):
3130 signal.signal(signal.SIGALRM, self.oldalrm)
3131
3132 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00003133 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003134
3135 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinner49d495f2011-07-04 11:44:46 +02003136 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
3137 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003138 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3139 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou6439c002011-02-25 21:35:47 +00003140 invokes the signal handler, and bubbles up the exception raised
3141 in the latter."""
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003142 read_results = []
3143 def _read():
3144 s = os.read(r, 1)
3145 read_results.append(s)
3146 t = threading.Thread(target=_read)
3147 t.daemon = True
3148 r, w = os.pipe()
3149 try:
3150 wio = self.io.open(w, **fdopen_kwargs)
3151 t.start()
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003152 # Fill the pipe enough that the write will be blocking.
3153 # It will be interrupted by the timer armed above. Since the
3154 # other thread has read one byte, the low-level write will
3155 # return with a successful (partial) result rather than an EINTR.
3156 # The buffered IO layer must check for pending signal
3157 # handlers, which in this case will invoke alarm_interrupt().
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03003158 try:
Victor Stinner3604b232018-06-01 15:23:02 +02003159 signal.alarm(1)
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03003160 with self.assertRaises(ZeroDivisionError):
3161 wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1))
3162 finally:
Victor Stinner3604b232018-06-01 15:23:02 +02003163 signal.alarm(0)
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03003164 t.join()
Victor Stinner3604b232018-06-01 15:23:02 +02003165
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003166 # We got one byte, get another one and check that it isn't a
3167 # repeat of the first one.
3168 read_results.append(os.read(r, 1))
3169 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3170 finally:
3171 os.close(w)
3172 os.close(r)
3173 # This is deliberate. If we didn't close the file descriptor
3174 # before closing wio, wio would try to flush its internal
3175 # buffer, and block again.
3176 try:
3177 wio.close()
3178 except IOError as e:
3179 if e.errno != errno.EBADF:
3180 raise
3181
3182 def test_interrupted_write_unbuffered(self):
3183 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3184
3185 def test_interrupted_write_buffered(self):
3186 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3187
3188 def test_interrupted_write_text(self):
3189 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3190
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003191 def check_reentrant_write(self, data, **fdopen_kwargs):
3192 def on_alarm(*args):
3193 # Will be called reentrantly from the same thread
3194 wio.write(data)
Victor Stinner4c41f842011-07-05 13:29:26 +02003195 1//0
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003196 signal.signal(signal.SIGALRM, on_alarm)
3197 r, w = os.pipe()
3198 wio = self.io.open(w, **fdopen_kwargs)
3199 try:
3200 signal.alarm(1)
3201 # Either the reentrant call to wio.write() fails with RuntimeError,
3202 # or the signal handler raises ZeroDivisionError.
3203 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3204 while 1:
3205 for i in range(100):
3206 wio.write(data)
3207 wio.flush()
3208 # Make sure the buffer doesn't fill up and block further writes
3209 os.read(r, len(data) * 100)
3210 exc = cm.exception
3211 if isinstance(exc, RuntimeError):
3212 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3213 finally:
Victor Stinner3604b232018-06-01 15:23:02 +02003214 signal.alarm(0)
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003215 wio.close()
3216 os.close(r)
3217
3218 def test_reentrant_write_buffered(self):
3219 self.check_reentrant_write(b"xy", mode="wb")
3220
3221 def test_reentrant_write_text(self):
3222 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3223
Antoine Pitrou6439c002011-02-25 21:35:47 +00003224 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3225 """Check that a buffered read, when it gets interrupted (either
3226 returning a partial result or EINTR), properly invokes the signal
3227 handler and retries if the latter returned successfully."""
3228 r, w = os.pipe()
3229 fdopen_kwargs["closefd"] = False
3230 def alarm_handler(sig, frame):
3231 os.write(w, b"bar")
3232 signal.signal(signal.SIGALRM, alarm_handler)
3233 try:
3234 rio = self.io.open(r, **fdopen_kwargs)
3235 os.write(w, b"foo")
3236 signal.alarm(1)
3237 # Expected behaviour:
3238 # - first raw read() returns partial b"foo"
3239 # - second raw read() returns EINTR
3240 # - third raw read() returns b"bar"
3241 self.assertEqual(decode(rio.read(6)), "foobar")
3242 finally:
Victor Stinner3604b232018-06-01 15:23:02 +02003243 signal.alarm(0)
Antoine Pitrou6439c002011-02-25 21:35:47 +00003244 rio.close()
3245 os.close(w)
3246 os.close(r)
3247
3248 def test_interrupterd_read_retry_buffered(self):
3249 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3250 mode="rb")
3251
3252 def test_interrupterd_read_retry_text(self):
3253 self.check_interrupted_read_retry(lambda x: x,
3254 mode="r")
3255
3256 @unittest.skipUnless(threading, 'Threading required for this test.')
3257 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3258 """Check that a buffered write, when it gets interrupted (either
3259 returning a partial result or EINTR), properly invokes the signal
3260 handler and retries if the latter returned successfully."""
3261 select = support.import_module("select")
3262 # A quantity that exceeds the buffer size of an anonymous pipe's
3263 # write end.
Antoine Pitrou68915d72013-04-24 23:31:38 +02003264 N = support.PIPE_MAX_SIZE
Antoine Pitrou6439c002011-02-25 21:35:47 +00003265 r, w = os.pipe()
3266 fdopen_kwargs["closefd"] = False
3267 # We need a separate thread to read from the pipe and allow the
3268 # write() to finish. This thread is started after the SIGALRM is
3269 # received (forcing a first EINTR in write()).
3270 read_results = []
3271 write_finished = False
Serhiy Storchaka53ea1622015-03-28 20:38:48 +02003272 error = [None]
Antoine Pitrou6439c002011-02-25 21:35:47 +00003273 def _read():
Serhiy Storchaka53ea1622015-03-28 20:38:48 +02003274 try:
3275 while not write_finished:
3276 while r in select.select([r], [], [], 1.0)[0]:
3277 s = os.read(r, 1024)
3278 read_results.append(s)
3279 except BaseException as exc:
3280 error[0] = exc
Antoine Pitrou6439c002011-02-25 21:35:47 +00003281 t = threading.Thread(target=_read)
3282 t.daemon = True
3283 def alarm1(sig, frame):
3284 signal.signal(signal.SIGALRM, alarm2)
3285 signal.alarm(1)
3286 def alarm2(sig, frame):
3287 t.start()
3288 signal.signal(signal.SIGALRM, alarm1)
3289 try:
3290 wio = self.io.open(w, **fdopen_kwargs)
3291 signal.alarm(1)
3292 # Expected behaviour:
3293 # - first raw write() is partial (because of the limited pipe buffer
3294 # and the first alarm)
3295 # - second raw write() returns EINTR (because of the second alarm)
3296 # - subsequent write()s are successful (either partial or complete)
3297 self.assertEqual(N, wio.write(item * N))
3298 wio.flush()
3299 write_finished = True
3300 t.join()
Serhiy Storchaka53ea1622015-03-28 20:38:48 +02003301
3302 self.assertIsNone(error[0])
Antoine Pitrou6439c002011-02-25 21:35:47 +00003303 self.assertEqual(N, sum(len(x) for x in read_results))
3304 finally:
Victor Stinner3604b232018-06-01 15:23:02 +02003305 signal.alarm(0)
Antoine Pitrou6439c002011-02-25 21:35:47 +00003306 write_finished = True
3307 os.close(w)
3308 os.close(r)
3309 # This is deliberate. If we didn't close the file descriptor
3310 # before closing wio, wio would try to flush its internal
3311 # buffer, and could block (in case of failure).
3312 try:
3313 wio.close()
3314 except IOError as e:
3315 if e.errno != errno.EBADF:
3316 raise
3317
3318 def test_interrupterd_write_retry_buffered(self):
3319 self.check_interrupted_write_retry(b"x", mode="wb")
3320
3321 def test_interrupterd_write_retry_text(self):
3322 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3323
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003324
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003325class CSignalsTest(SignalsTest):
3326 io = io
3327
3328class PySignalsTest(SignalsTest):
3329 io = pyio
3330
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003331 # Handling reentrancy issues would slow down _pyio even more, so the
3332 # tests are disabled.
3333 test_reentrant_write_buffered = None
3334 test_reentrant_write_text = None
3335
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003336
Christian Heimes1a6387e2008-03-26 12:49:49 +00003337def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00003338 tests = (CIOTest, PyIOTest,
3339 CBufferedReaderTest, PyBufferedReaderTest,
3340 CBufferedWriterTest, PyBufferedWriterTest,
3341 CBufferedRWPairTest, PyBufferedRWPairTest,
3342 CBufferedRandomTest, PyBufferedRandomTest,
3343 StatefulIncrementalDecoderTest,
3344 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3345 CTextIOWrapperTest, PyTextIOWrapperTest,
3346 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003347 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00003348 )
3349
3350 # Put the namespaces of the IO module we are testing and some useful mock
3351 # classes in the __dict__ of each test.
3352 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00003353 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00003354 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3355 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
3356 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
3357 globs = globals()
3358 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3359 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3360 # Avoid turning open into a bound method.
3361 py_io_ns["open"] = pyio.OpenWrapper
3362 for test in tests:
3363 if test.__name__.startswith("C"):
3364 for name, obj in c_io_ns.items():
3365 setattr(test, name, obj)
3366 elif test.__name__.startswith("Py"):
3367 for name, obj in py_io_ns.items():
3368 setattr(test, name, obj)
3369
3370 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00003371
3372if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00003373 test_main()