blob: 26c5dfe9261b7f6db2afd705287914251e83339c [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakac72e66a2015-11-02 15:06:09 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Antoine Pitrou19690592009-06-12 20:14:08 +000019# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Serhiy Storchaka05b0a1b2014-06-09 13:32:08 +030032import warnings
Antoine Pitrou19690592009-06-12 20:14:08 +000033import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000034import signal
35import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000036from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000037from collections import deque
Antoine Pitrou78e761e2012-10-16 22:57:11 +020038from UserList import UserList
Antoine Pitrou19690592009-06-12 20:14:08 +000039from test import test_support as support
Serhiy Storchaka354d50e2013-02-03 17:10:42 +020040import contextlib
Christian Heimes1a6387e2008-03-26 12:49:49 +000041
42import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000043import io # C implementation of io
44import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000045try:
46 import threading
47except ImportError:
48 threading = None
Antoine Pitrou5aa7df32011-11-21 20:16:44 +010049try:
50 import fcntl
51except ImportError:
52 fcntl = None
Antoine Pitrou19690592009-06-12 20:14:08 +000053
54__metaclass__ = type
55bytes = support.py3k_bytes
56
Martin Panterc9813d82016-06-03 05:59:20 +000057def byteslike(*pos, **kw):
58 return memoryview(bytearray(*pos, **kw))
59
Antoine Pitrou19690592009-06-12 20:14:08 +000060def _default_chunk_size():
61 """Get the default TextIOWrapper chunk size"""
62 with io.open(__file__, "r", encoding="latin1") as f:
63 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000064
65
Antoine Pitrou6391b342010-09-14 18:48:19 +000066class MockRawIOWithoutRead:
67 """A RawIO implementation without read(), so as to exercise the default
68 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000069
70 def __init__(self, read_stack=()):
71 self._read_stack = list(read_stack)
72 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000073 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000074 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000075
Christian Heimes1a6387e2008-03-26 12:49:49 +000076 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000077 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000078 return len(b)
79
80 def writable(self):
81 return True
82
83 def fileno(self):
84 return 42
85
86 def readable(self):
87 return True
88
89 def seekable(self):
90 return True
91
92 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000093 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000094
95 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000096 return 0 # same comment as above
97
98 def readinto(self, buf):
99 self._reads += 1
100 max_len = len(buf)
101 try:
102 data = self._read_stack[0]
103 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000104 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +0000105 return 0
106 if data is None:
107 del self._read_stack[0]
108 return None
109 n = len(data)
110 if len(data) <= max_len:
111 del self._read_stack[0]
112 buf[:n] = data
113 return n
114 else:
115 buf[:] = data[:max_len]
116 self._read_stack[0] = data[max_len:]
117 return max_len
118
119 def truncate(self, pos=None):
120 return pos
121
Antoine Pitrou6391b342010-09-14 18:48:19 +0000122class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
123 pass
124
125class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
126 pass
127
128
129class MockRawIO(MockRawIOWithoutRead):
130
131 def read(self, n=None):
132 self._reads += 1
133 try:
134 return self._read_stack.pop(0)
135 except:
136 self._extraneous_reads += 1
137 return b""
138
Antoine Pitrou19690592009-06-12 20:14:08 +0000139class CMockRawIO(MockRawIO, io.RawIOBase):
140 pass
141
142class PyMockRawIO(MockRawIO, pyio.RawIOBase):
143 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000144
145
Antoine Pitrou19690592009-06-12 20:14:08 +0000146class MisbehavedRawIO(MockRawIO):
147 def write(self, b):
148 return MockRawIO.write(self, b) * 2
149
150 def read(self, n=None):
151 return MockRawIO.read(self, n) * 2
152
153 def seek(self, pos, whence):
154 return -123
155
156 def tell(self):
157 return -456
158
159 def readinto(self, buf):
160 MockRawIO.readinto(self, buf)
161 return len(buf) * 5
162
163class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
164 pass
165
166class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
167 pass
168
169
170class CloseFailureIO(MockRawIO):
171 closed = 0
172
173 def close(self):
174 if not self.closed:
175 self.closed = 1
176 raise IOError
177
178class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
179 pass
180
181class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
182 pass
183
184
185class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000186
187 def __init__(self, data):
188 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000189 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000190
191 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000192 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000193 self.read_history.append(None if res is None else len(res))
194 return res
195
Antoine Pitrou19690592009-06-12 20:14:08 +0000196 def readinto(self, b):
197 res = super(MockFileIO, self).readinto(b)
198 self.read_history.append(res)
199 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000200
Antoine Pitrou19690592009-06-12 20:14:08 +0000201class CMockFileIO(MockFileIO, io.BytesIO):
202 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000203
Antoine Pitrou19690592009-06-12 20:14:08 +0000204class PyMockFileIO(MockFileIO, pyio.BytesIO):
205 pass
206
207
208class MockNonBlockWriterIO:
209
210 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000211 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000212 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000213
Antoine Pitrou19690592009-06-12 20:14:08 +0000214 def pop_written(self):
215 s = b"".join(self._write_stack)
216 self._write_stack[:] = []
217 return s
218
219 def block_on(self, char):
220 """Block when a given char is encountered."""
221 self._blocker_char = char
222
223 def readable(self):
224 return True
225
226 def seekable(self):
227 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000228
229 def writable(self):
230 return True
231
Antoine Pitrou19690592009-06-12 20:14:08 +0000232 def write(self, b):
233 b = bytes(b)
234 n = -1
235 if self._blocker_char:
236 try:
237 n = b.index(self._blocker_char)
238 except ValueError:
239 pass
240 else:
Antoine Pitrou5aa7df32011-11-21 20:16:44 +0100241 if n > 0:
242 # write data up to the first blocker
243 self._write_stack.append(b[:n])
244 return n
245 else:
246 # cancel blocker and indicate would block
247 self._blocker_char = None
248 return None
Antoine Pitrou19690592009-06-12 20:14:08 +0000249 self._write_stack.append(b)
250 return len(b)
251
252class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
253 BlockingIOError = io.BlockingIOError
254
255class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
256 BlockingIOError = pyio.BlockingIOError
257
Christian Heimes1a6387e2008-03-26 12:49:49 +0000258
259class IOTest(unittest.TestCase):
260
Antoine Pitrou19690592009-06-12 20:14:08 +0000261 def setUp(self):
262 support.unlink(support.TESTFN)
263
Christian Heimes1a6387e2008-03-26 12:49:49 +0000264 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000265 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000266
267 def write_ops(self, f):
268 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000269 f.truncate(0)
270 self.assertEqual(f.tell(), 5)
271 f.seek(0)
272
273 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000274 self.assertEqual(f.seek(0), 0)
275 self.assertEqual(f.write(b"Hello."), 6)
276 self.assertEqual(f.tell(), 6)
277 self.assertEqual(f.seek(-1, 1), 5)
278 self.assertEqual(f.tell(), 5)
Martin Panterc9813d82016-06-03 05:59:20 +0000279 buffer = bytearray(b" world\n\n\n")
280 self.assertEqual(f.write(buffer), 9)
281 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Christian Heimes1a6387e2008-03-26 12:49:49 +0000282 self.assertEqual(f.seek(0), 0)
283 self.assertEqual(f.write(b"h"), 1)
284 self.assertEqual(f.seek(-1, 2), 13)
285 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000286
Christian Heimes1a6387e2008-03-26 12:49:49 +0000287 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000288 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000289 self.assertRaises(TypeError, f.seek, 0.0)
290
291 def read_ops(self, f, buffered=False):
292 data = f.read(5)
293 self.assertEqual(data, b"hello")
Martin Panterc9813d82016-06-03 05:59:20 +0000294 data = byteslike(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000295 self.assertEqual(f.readinto(data), 5)
Martin Panterc9813d82016-06-03 05:59:20 +0000296 self.assertEqual(data.tobytes(), b" worl")
297 data = bytearray(5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000298 self.assertEqual(f.readinto(data), 2)
299 self.assertEqual(len(data), 5)
300 self.assertEqual(data[:2], b"d\n")
301 self.assertEqual(f.seek(0), 0)
302 self.assertEqual(f.read(20), b"hello world\n")
303 self.assertEqual(f.read(1), b"")
Martin Panterc9813d82016-06-03 05:59:20 +0000304 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000305 self.assertEqual(f.seek(-6, 2), 6)
306 self.assertEqual(f.read(5), b"world")
307 self.assertEqual(f.read(0), b"")
Martin Panterc9813d82016-06-03 05:59:20 +0000308 self.assertEqual(f.readinto(byteslike()), 0)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000309 self.assertEqual(f.seek(-6, 1), 5)
310 self.assertEqual(f.read(5), b" worl")
311 self.assertEqual(f.tell(), 10)
312 self.assertRaises(TypeError, f.seek, 0.0)
313 if buffered:
314 f.seek(0)
315 self.assertEqual(f.read(), b"hello world\n")
316 f.seek(6)
317 self.assertEqual(f.read(), b"world\n")
318 self.assertEqual(f.read(), b"")
319
320 LARGE = 2**31
321
322 def large_file_ops(self, f):
323 assert f.readable()
324 assert f.writable()
325 self.assertEqual(f.seek(self.LARGE), self.LARGE)
326 self.assertEqual(f.tell(), self.LARGE)
327 self.assertEqual(f.write(b"xxx"), 3)
328 self.assertEqual(f.tell(), self.LARGE + 3)
329 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
330 self.assertEqual(f.truncate(), self.LARGE + 2)
331 self.assertEqual(f.tell(), self.LARGE + 2)
332 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
333 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000334 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000335 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
336 self.assertEqual(f.seek(-1, 2), self.LARGE)
337 self.assertEqual(f.read(2), b"x")
338
Antoine Pitrou19690592009-06-12 20:14:08 +0000339 def test_invalid_operations(self):
340 # Try writing on a file opened in read mode and vice-versa.
341 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000342 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000343 self.assertRaises(IOError, fp.read)
344 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000345 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000346 self.assertRaises(IOError, fp.write, b"blah")
347 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000348 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000349 self.assertRaises(IOError, fp.write, "blah")
350 self.assertRaises(IOError, fp.writelines, ["blah\n"])
351
Serhiy Storchaka3c9ce742016-07-01 23:34:44 +0300352 def test_open_handles_NUL_chars(self):
353 fn_with_NUL = 'foo\0bar'
354 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
355
356 bytes_fn = fn_with_NUL.encode('ascii')
357 with warnings.catch_warnings():
358 warnings.simplefilter("ignore", DeprecationWarning)
359 self.assertRaises(TypeError, self.open, bytes_fn, 'w')
360
Christian Heimes1a6387e2008-03-26 12:49:49 +0000361 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000362 with self.open(support.TESTFN, "wb", buffering=0) as f:
363 self.assertEqual(f.readable(), False)
364 self.assertEqual(f.writable(), True)
365 self.assertEqual(f.seekable(), True)
366 self.write_ops(f)
367 with self.open(support.TESTFN, "rb", buffering=0) as f:
368 self.assertEqual(f.readable(), True)
369 self.assertEqual(f.writable(), False)
370 self.assertEqual(f.seekable(), True)
371 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000372
373 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000374 with self.open(support.TESTFN, "wb") as f:
375 self.assertEqual(f.readable(), False)
376 self.assertEqual(f.writable(), True)
377 self.assertEqual(f.seekable(), True)
378 self.write_ops(f)
379 with self.open(support.TESTFN, "rb") as f:
380 self.assertEqual(f.readable(), True)
381 self.assertEqual(f.writable(), False)
382 self.assertEqual(f.seekable(), True)
383 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000384
385 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000386 with self.open(support.TESTFN, "wb") as f:
387 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
388 with self.open(support.TESTFN, "rb") as f:
389 self.assertEqual(f.readline(), b"abc\n")
390 self.assertEqual(f.readline(10), b"def\n")
391 self.assertEqual(f.readline(2), b"xy")
392 self.assertEqual(f.readline(4), b"zzy\n")
393 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000394 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000395 self.assertRaises(TypeError, f.readline, 5.3)
396 with self.open(support.TESTFN, "r") as f:
397 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000398
Serhiy Storchaka64aa4df2017-04-19 22:34:58 +0300399 def test_readline_nonsizeable(self):
400 # Issue #30061
401 # Crash when readline() returns an object without __len__
402 class R(self.IOBase):
403 def readline(self):
404 return None
405 self.assertRaises((TypeError, StopIteration), next, R())
406
407 def test_next_nonsizeable(self):
408 # Issue #30061
409 # Crash when next() returns an object without __len__
410 class R(self.IOBase):
411 def next(self):
412 return None
413 self.assertRaises(TypeError, R().readlines, 1)
414
Christian Heimes1a6387e2008-03-26 12:49:49 +0000415 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000416 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000417 self.write_ops(f)
418 data = f.getvalue()
419 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000420 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000421 self.read_ops(f, True)
422
423 def test_large_file_ops(self):
424 # On Windows and Mac OSX this test comsumes large resources; It takes
425 # a long time to build the >2GB file and takes >2GB of disk space
426 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000427 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware1f702212013-12-10 14:09:20 -0600428 support.requires(
429 'largefile',
430 'test requires %s bytes and a long time to run' % self.LARGE)
Antoine Pitrou19690592009-06-12 20:14:08 +0000431 with self.open(support.TESTFN, "w+b", 0) as f:
432 self.large_file_ops(f)
433 with self.open(support.TESTFN, "w+b") as f:
434 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000435
436 def test_with_open(self):
437 for bufsize in (0, 1, 100):
438 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000439 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000440 f.write(b"xxx")
441 self.assertEqual(f.closed, True)
442 f = None
443 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000444 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000445 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000446 except ZeroDivisionError:
447 self.assertEqual(f.closed, True)
448 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000449 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000450
Antoine Pitroue741cc62009-01-21 00:45:36 +0000451 # issue 5008
452 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000453 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000454 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000455 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000456 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000457 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000458 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000459 with self.open(support.TESTFN, "a") as f:
Serhiy Storchakaea4d2872015-08-02 15:19:04 +0300460 self.assertGreater(f.tell(), 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000461
Christian Heimes1a6387e2008-03-26 12:49:49 +0000462 def test_destructor(self):
463 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000464 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000465 def __del__(self):
466 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000467 try:
468 f = super(MyFileIO, self).__del__
469 except AttributeError:
470 pass
471 else:
472 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000473 def close(self):
474 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000475 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000476 def flush(self):
477 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000478 super(MyFileIO, self).flush()
479 f = MyFileIO(support.TESTFN, "wb")
480 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000481 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000482 support.gc_collect()
483 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000484 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000485 self.assertEqual(f.read(), b"xxx")
486
487 def _check_base_destructor(self, base):
488 record = []
489 class MyIO(base):
490 def __init__(self):
491 # This exercises the availability of attributes on object
492 # destruction.
493 # (in the C version, close() is called by the tp_dealloc
494 # function, not by __del__)
495 self.on_del = 1
496 self.on_close = 2
497 self.on_flush = 3
498 def __del__(self):
499 record.append(self.on_del)
500 try:
501 f = super(MyIO, self).__del__
502 except AttributeError:
503 pass
504 else:
505 f()
506 def close(self):
507 record.append(self.on_close)
508 super(MyIO, self).close()
509 def flush(self):
510 record.append(self.on_flush)
511 super(MyIO, self).flush()
512 f = MyIO()
513 del f
514 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000515 self.assertEqual(record, [1, 2, 3])
516
Antoine Pitrou19690592009-06-12 20:14:08 +0000517 def test_IOBase_destructor(self):
518 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000519
Antoine Pitrou19690592009-06-12 20:14:08 +0000520 def test_RawIOBase_destructor(self):
521 self._check_base_destructor(self.RawIOBase)
522
523 def test_BufferedIOBase_destructor(self):
524 self._check_base_destructor(self.BufferedIOBase)
525
526 def test_TextIOBase_destructor(self):
527 self._check_base_destructor(self.TextIOBase)
528
529 def test_close_flushes(self):
530 with self.open(support.TESTFN, "wb") as f:
531 f.write(b"xxx")
532 with self.open(support.TESTFN, "rb") as f:
533 self.assertEqual(f.read(), b"xxx")
534
535 def test_array_writes(self):
536 a = array.array(b'i', range(10))
537 n = len(a.tostring())
538 with self.open(support.TESTFN, "wb", 0) as f:
539 self.assertEqual(f.write(a), n)
540 with self.open(support.TESTFN, "wb") as f:
541 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000542
543 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000544 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000545 closefd=False)
546
Antoine Pitrou19690592009-06-12 20:14:08 +0000547 def test_read_closed(self):
548 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000549 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000550 with self.open(support.TESTFN, "r") as f:
551 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000552 self.assertEqual(file.read(), "egg\n")
553 file.seek(0)
554 file.close()
555 self.assertRaises(ValueError, file.read)
556
557 def test_no_closefd_with_filename(self):
558 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000559 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000560
561 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000562 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000563 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000564 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000565 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000566 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000567 self.assertEqual(file.buffer.raw.closefd, False)
568
Antoine Pitrou19690592009-06-12 20:14:08 +0000569 def test_garbage_collection(self):
570 # FileIO objects are collected, and collecting them flushes
571 # all data to disk.
572 f = self.FileIO(support.TESTFN, "wb")
573 f.write(b"abcxxx")
574 f.f = f
575 wr = weakref.ref(f)
576 del f
577 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +0300578 self.assertIsNone(wr(), wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000579 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000580 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000581
Antoine Pitrou19690592009-06-12 20:14:08 +0000582 def test_unbounded_file(self):
583 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
584 zero = "/dev/zero"
585 if not os.path.exists(zero):
586 self.skipTest("{0} does not exist".format(zero))
587 if sys.maxsize > 0x7FFFFFFF:
588 self.skipTest("test can only run in a 32-bit address space")
589 if support.real_max_memuse < support._2G:
590 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000591 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000592 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000593 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000594 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000595 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000596 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000597
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200598 def check_flush_error_on_close(self, *args, **kwargs):
599 # Test that the file is closed despite failed flush
600 # and that flush() is called before file closed.
601 f = self.open(*args, **kwargs)
602 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000603 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200604 closed[:] = [f.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000605 raise IOError()
606 f.flush = bad_flush
607 self.assertRaises(IOError, f.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600608 self.assertTrue(f.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200609 self.assertTrue(closed) # flush() called
610 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchaka437d5352015-02-23 00:28:38 +0200611 f.flush = lambda: None # break reference loop
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200612
613 def test_flush_error_on_close(self):
614 # raw file
615 # Issue #5700: io.FileIO calls flush() after file closed
616 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
617 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
618 self.check_flush_error_on_close(fd, 'wb', buffering=0)
619 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
620 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
621 os.close(fd)
622 # buffered io
623 self.check_flush_error_on_close(support.TESTFN, 'wb')
624 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
625 self.check_flush_error_on_close(fd, 'wb')
626 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
627 self.check_flush_error_on_close(fd, 'wb', closefd=False)
628 os.close(fd)
629 # text io
630 self.check_flush_error_on_close(support.TESTFN, 'w')
631 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
632 self.check_flush_error_on_close(fd, 'w')
633 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
634 self.check_flush_error_on_close(fd, 'w', closefd=False)
635 os.close(fd)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000636
637 def test_multi_close(self):
638 f = self.open(support.TESTFN, "wb", buffering=0)
639 f.close()
640 f.close()
641 f.close()
642 self.assertRaises(ValueError, f.flush)
643
Antoine Pitrou6391b342010-09-14 18:48:19 +0000644 def test_RawIOBase_read(self):
645 # Exercise the default RawIOBase.read() implementation (which calls
646 # readinto() internally).
647 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
648 self.assertEqual(rawio.read(2), b"ab")
649 self.assertEqual(rawio.read(2), b"c")
650 self.assertEqual(rawio.read(2), b"d")
651 self.assertEqual(rawio.read(2), None)
652 self.assertEqual(rawio.read(2), b"ef")
653 self.assertEqual(rawio.read(2), b"g")
654 self.assertEqual(rawio.read(2), None)
655 self.assertEqual(rawio.read(2), b"")
656
Hynek Schlawack877effc2012-05-25 09:24:18 +0200657 def test_fileio_closefd(self):
658 # Issue #4841
659 with self.open(__file__, 'rb') as f1, \
660 self.open(__file__, 'rb') as f2:
661 fileio = self.FileIO(f1.fileno(), closefd=False)
662 # .__init__() must not close f1
663 fileio.__init__(f2.fileno(), closefd=False)
664 f1.readline()
665 # .close() must not close f2
666 fileio.close()
667 f2.readline()
668
Serhiy Storchaka05b0a1b2014-06-09 13:32:08 +0300669 def test_nonbuffered_textio(self):
670 with warnings.catch_warnings(record=True) as recorded:
671 with self.assertRaises(ValueError):
672 self.open(support.TESTFN, 'w', buffering=0)
673 support.gc_collect()
674 self.assertEqual(recorded, [])
675
676 def test_invalid_newline(self):
677 with warnings.catch_warnings(record=True) as recorded:
678 with self.assertRaises(ValueError):
679 self.open(support.TESTFN, 'w', newline='invalid')
680 support.gc_collect()
681 self.assertEqual(recorded, [])
682
Martin Panterc9813d82016-06-03 05:59:20 +0000683 def test_buffered_readinto_mixin(self):
684 # Test the implementation provided by BufferedIOBase
685 class Stream(self.BufferedIOBase):
686 def read(self, size):
687 return b"12345"
688 stream = Stream()
689 buffer = byteslike(5)
690 self.assertEqual(stream.readinto(buffer), 5)
691 self.assertEqual(buffer.tobytes(), b"12345")
692
Serhiy Storchakafc153d12018-07-17 18:15:46 +0300693 def test_close_assert(self):
694 class R(self.IOBase):
695 def __setattr__(self, name, value):
696 pass
697 def flush(self):
698 raise OSError()
699 f = R()
700 # This would cause an assertion failure.
701 self.assertRaises(OSError, f.close)
702
Hynek Schlawack877effc2012-05-25 09:24:18 +0200703
Antoine Pitrou19690592009-06-12 20:14:08 +0000704class CIOTest(IOTest):
Antoine Pitrou16166452011-07-12 22:04:20 +0200705
706 def test_IOBase_finalize(self):
707 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
708 # class which inherits IOBase and an object of this class are caught
709 # in a reference cycle and close() is already in the method cache.
710 class MyIO(self.IOBase):
711 def close(self):
712 pass
713
714 # create an instance to populate the method cache
715 MyIO()
716 obj = MyIO()
717 obj.obj = obj
718 wr = weakref.ref(obj)
719 del MyIO
720 del obj
721 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +0300722 self.assertIsNone(wr(), wr)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000723
Antoine Pitrou19690592009-06-12 20:14:08 +0000724class PyIOTest(IOTest):
725 test_array_writes = unittest.skip(
726 "len(array.array) returns number of elements rather than bytelength"
727 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000728
729
Antoine Pitrou19690592009-06-12 20:14:08 +0000730class CommonBufferedTests:
731 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
732
733 def test_detach(self):
734 raw = self.MockRawIO()
735 buf = self.tp(raw)
736 self.assertIs(buf.detach(), raw)
737 self.assertRaises(ValueError, buf.detach)
738
Benjamin Peterson53ae6142014-12-21 20:51:50 -0600739 repr(buf) # Should still work
740
Antoine Pitrou19690592009-06-12 20:14:08 +0000741 def test_fileno(self):
742 rawio = self.MockRawIO()
743 bufio = self.tp(rawio)
744
Ezio Melotti2623a372010-11-21 13:34:58 +0000745 self.assertEqual(42, bufio.fileno())
Antoine Pitrou19690592009-06-12 20:14:08 +0000746
Antoine Pitrou19690592009-06-12 20:14:08 +0000747 def test_invalid_args(self):
748 rawio = self.MockRawIO()
749 bufio = self.tp(rawio)
750 # Invalid whence
751 self.assertRaises(ValueError, bufio.seek, 0, -1)
752 self.assertRaises(ValueError, bufio.seek, 0, 3)
753
754 def test_override_destructor(self):
755 tp = self.tp
756 record = []
757 class MyBufferedIO(tp):
758 def __del__(self):
759 record.append(1)
760 try:
761 f = super(MyBufferedIO, self).__del__
762 except AttributeError:
763 pass
764 else:
765 f()
766 def close(self):
767 record.append(2)
768 super(MyBufferedIO, self).close()
769 def flush(self):
770 record.append(3)
771 super(MyBufferedIO, self).flush()
772 rawio = self.MockRawIO()
773 bufio = MyBufferedIO(rawio)
774 writable = bufio.writable()
775 del bufio
776 support.gc_collect()
777 if writable:
778 self.assertEqual(record, [1, 2, 3])
779 else:
780 self.assertEqual(record, [1, 2])
781
782 def test_context_manager(self):
783 # Test usability as a context manager
784 rawio = self.MockRawIO()
785 bufio = self.tp(rawio)
786 def _with():
787 with bufio:
788 pass
789 _with()
790 # bufio should now be closed, and using it a second time should raise
791 # a ValueError.
792 self.assertRaises(ValueError, _with)
793
794 def test_error_through_destructor(self):
795 # Test that the exception state is not modified by a destructor,
796 # even if close() fails.
797 rawio = self.CloseFailureIO()
798 def f():
799 self.tp(rawio).xyzzy
800 with support.captured_output("stderr") as s:
801 self.assertRaises(AttributeError, f)
802 s = s.getvalue().strip()
803 if s:
804 # The destructor *may* have printed an unraisable error, check it
805 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000806 self.assertTrue(s.startswith("Exception IOError: "), s)
807 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000808
809 def test_repr(self):
810 raw = self.MockRawIO()
811 b = self.tp(raw)
812 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
813 self.assertEqual(repr(b), "<%s>" % clsname)
814 raw.name = "dummy"
815 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
816 raw.name = b"dummy"
817 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000818
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000819 def test_flush_error_on_close(self):
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200820 # Test that buffered file is closed despite failed flush
821 # and that flush() is called before file closed.
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000822 raw = self.MockRawIO()
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200823 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000824 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200825 closed[:] = [b.closed, raw.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000826 raise IOError()
827 raw.flush = bad_flush
828 b = self.tp(raw)
829 self.assertRaises(IOError, b.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600830 self.assertTrue(b.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200831 self.assertTrue(raw.closed)
832 self.assertTrue(closed) # flush() called
833 self.assertFalse(closed[0]) # flush() called before file closed
834 self.assertFalse(closed[1])
Serhiy Storchaka437d5352015-02-23 00:28:38 +0200835 raw.flush = lambda: None # break reference loop
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600836
837 def test_close_error_on_close(self):
838 raw = self.MockRawIO()
839 def bad_flush():
840 raise IOError('flush')
841 def bad_close():
842 raise IOError('close')
843 raw.close = bad_close
844 b = self.tp(raw)
845 b.flush = bad_flush
846 with self.assertRaises(IOError) as err: # exception not swallowed
847 b.close()
848 self.assertEqual(err.exception.args, ('close',))
849 self.assertFalse(b.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000850
851 def test_multi_close(self):
852 raw = self.MockRawIO()
853 b = self.tp(raw)
854 b.close()
855 b.close()
856 b.close()
857 self.assertRaises(ValueError, b.flush)
858
Antoine Pitroufc9ead62010-12-21 21:26:55 +0000859 def test_readonly_attributes(self):
860 raw = self.MockRawIO()
861 buf = self.tp(raw)
862 x = self.MockRawIO()
863 with self.assertRaises((AttributeError, TypeError)):
864 buf.raw = x
865
Christian Heimes1a6387e2008-03-26 12:49:49 +0000866
Antoine Pitroubff5df02012-07-29 19:02:46 +0200867class SizeofTest:
868
869 @support.cpython_only
870 def test_sizeof(self):
871 bufsize1 = 4096
872 bufsize2 = 8192
873 rawio = self.MockRawIO()
874 bufio = self.tp(rawio, buffer_size=bufsize1)
875 size = sys.getsizeof(bufio) - bufsize1
876 rawio = self.MockRawIO()
877 bufio = self.tp(rawio, buffer_size=bufsize2)
878 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
879
880
Antoine Pitrou19690592009-06-12 20:14:08 +0000881class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
882 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000883
Antoine Pitrou19690592009-06-12 20:14:08 +0000884 def test_constructor(self):
885 rawio = self.MockRawIO([b"abc"])
886 bufio = self.tp(rawio)
887 bufio.__init__(rawio)
888 bufio.__init__(rawio, buffer_size=1024)
889 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000890 self.assertEqual(b"abc", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000891 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
892 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
893 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
894 rawio = self.MockRawIO([b"abc"])
895 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000896 self.assertEqual(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000897
Serhiy Storchaka1d19f972014-02-12 10:52:07 +0200898 def test_uninitialized(self):
899 bufio = self.tp.__new__(self.tp)
900 del bufio
901 bufio = self.tp.__new__(self.tp)
902 self.assertRaisesRegexp((ValueError, AttributeError),
903 'uninitialized|has no attribute',
904 bufio.read, 0)
905 bufio.__init__(self.MockRawIO())
906 self.assertEqual(bufio.read(0), b'')
907
Antoine Pitrou19690592009-06-12 20:14:08 +0000908 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000909 for arg in (None, 7):
910 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
911 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000912 self.assertEqual(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000913 # Invalid args
914 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000915
Antoine Pitrou19690592009-06-12 20:14:08 +0000916 def test_read1(self):
917 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
918 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000919 self.assertEqual(b"a", bufio.read(1))
920 self.assertEqual(b"b", bufio.read1(1))
921 self.assertEqual(rawio._reads, 1)
922 self.assertEqual(b"c", bufio.read1(100))
923 self.assertEqual(rawio._reads, 1)
924 self.assertEqual(b"d", bufio.read1(100))
925 self.assertEqual(rawio._reads, 2)
926 self.assertEqual(b"efg", bufio.read1(100))
927 self.assertEqual(rawio._reads, 3)
928 self.assertEqual(b"", bufio.read1(100))
929 self.assertEqual(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000930 # Invalid args
931 self.assertRaises(ValueError, bufio.read1, -1)
932
933 def test_readinto(self):
934 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
935 bufio = self.tp(rawio)
936 b = bytearray(2)
Ezio Melotti2623a372010-11-21 13:34:58 +0000937 self.assertEqual(bufio.readinto(b), 2)
938 self.assertEqual(b, b"ab")
939 self.assertEqual(bufio.readinto(b), 2)
940 self.assertEqual(b, b"cd")
941 self.assertEqual(bufio.readinto(b), 2)
942 self.assertEqual(b, b"ef")
943 self.assertEqual(bufio.readinto(b), 1)
944 self.assertEqual(b, b"gf")
945 self.assertEqual(bufio.readinto(b), 0)
946 self.assertEqual(b, b"gf")
Antoine Pitrou19690592009-06-12 20:14:08 +0000947
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000948 def test_readlines(self):
949 def bufio():
950 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
951 return self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000952 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
953 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
954 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000955
Antoine Pitrou19690592009-06-12 20:14:08 +0000956 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000957 data = b"abcdefghi"
958 dlen = len(data)
959
960 tests = [
961 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
962 [ 100, [ 3, 3, 3], [ dlen ] ],
963 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
964 ]
965
966 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000967 rawio = self.MockFileIO(data)
968 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000969 pos = 0
970 for nbytes in buf_read_sizes:
Ezio Melotti2623a372010-11-21 13:34:58 +0000971 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Christian Heimes1a6387e2008-03-26 12:49:49 +0000972 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000973 # this is mildly implementation-dependent
Ezio Melotti2623a372010-11-21 13:34:58 +0000974 self.assertEqual(rawio.read_history, raw_read_sizes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000975
Antoine Pitrou19690592009-06-12 20:14:08 +0000976 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000977 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000978 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
979 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000980 self.assertEqual(b"abcd", bufio.read(6))
981 self.assertEqual(b"e", bufio.read(1))
982 self.assertEqual(b"fg", bufio.read())
983 self.assertEqual(b"", bufio.peek(1))
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200984 self.assertIsNone(bufio.read())
Ezio Melotti2623a372010-11-21 13:34:58 +0000985 self.assertEqual(b"", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000986
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200987 rawio = self.MockRawIO((b"a", None, None))
988 self.assertEqual(b"a", rawio.readall())
989 self.assertIsNone(rawio.readall())
990
Antoine Pitrou19690592009-06-12 20:14:08 +0000991 def test_read_past_eof(self):
992 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
993 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000994
Ezio Melotti2623a372010-11-21 13:34:58 +0000995 self.assertEqual(b"abcdefg", bufio.read(9000))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000996
Antoine Pitrou19690592009-06-12 20:14:08 +0000997 def test_read_all(self):
998 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
999 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001000
Ezio Melotti2623a372010-11-21 13:34:58 +00001001 self.assertEqual(b"abcdefg", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001002
Victor Stinner6a102812010-04-27 23:55:59 +00001003 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001004 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001005 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001006 try:
1007 # Write out many bytes with exactly the same number of 0's,
1008 # 1's... 255's. This will help us check that concurrent reading
1009 # doesn't duplicate or forget contents.
1010 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +00001011 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001012 random.shuffle(l)
1013 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001014 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001015 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001016 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001017 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001018 errors = []
1019 results = []
1020 def f():
1021 try:
1022 # Intra-buffer read then buffer-flushing read
1023 for n in cycle([1, 19]):
1024 s = bufio.read(n)
1025 if not s:
1026 break
1027 # list.append() is atomic
1028 results.append(s)
1029 except Exception as e:
1030 errors.append(e)
1031 raise
1032 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03001033 with support.start_threads(threads):
1034 time.sleep(0.02) # yield
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001035 self.assertFalse(errors,
1036 "the following exceptions were caught: %r" % errors)
1037 s = b''.join(results)
1038 for i in range(256):
1039 c = bytes(bytearray([i]))
1040 self.assertEqual(s.count(c), N)
1041 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001042 support.unlink(support.TESTFN)
1043
1044 def test_misbehaved_io(self):
1045 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1046 bufio = self.tp(rawio)
1047 self.assertRaises(IOError, bufio.seek, 0)
1048 self.assertRaises(IOError, bufio.tell)
1049
Antoine Pitroucb4f47c2010-08-11 13:40:17 +00001050 def test_no_extraneous_read(self):
1051 # Issue #9550; when the raw IO object has satisfied the read request,
1052 # we should not issue any additional reads, otherwise it may block
1053 # (e.g. socket).
1054 bufsize = 16
1055 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1056 rawio = self.MockRawIO([b"x" * n])
1057 bufio = self.tp(rawio, bufsize)
1058 self.assertEqual(bufio.read(n), b"x" * n)
1059 # Simple case: one raw read is enough to satisfy the request.
1060 self.assertEqual(rawio._extraneous_reads, 0,
1061 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1062 # A more complex case where two raw reads are needed to satisfy
1063 # the request.
1064 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1065 bufio = self.tp(rawio, bufsize)
1066 self.assertEqual(bufio.read(n), b"x" * n)
1067 self.assertEqual(rawio._extraneous_reads, 0,
1068 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1069
1070
Antoine Pitroubff5df02012-07-29 19:02:46 +02001071class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001072 tp = io.BufferedReader
1073
1074 def test_constructor(self):
1075 BufferedReaderTest.test_constructor(self)
1076 # The allocation can succeed on 32-bit builds, e.g. with more
1077 # than 2GB RAM and a 64-bit kernel.
1078 if sys.maxsize > 0x7FFFFFFF:
1079 rawio = self.MockRawIO()
1080 bufio = self.tp(rawio)
1081 self.assertRaises((OverflowError, MemoryError, ValueError),
1082 bufio.__init__, rawio, sys.maxsize)
1083
1084 def test_initialization(self):
1085 rawio = self.MockRawIO([b"abc"])
1086 bufio = self.tp(rawio)
1087 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1088 self.assertRaises(ValueError, bufio.read)
1089 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1090 self.assertRaises(ValueError, bufio.read)
1091 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1092 self.assertRaises(ValueError, bufio.read)
1093
1094 def test_misbehaved_io_read(self):
1095 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1096 bufio = self.tp(rawio)
1097 # _pyio.BufferedReader seems to implement reading different, so that
1098 # checking this is not so easy.
1099 self.assertRaises(IOError, bufio.read, 10)
1100
1101 def test_garbage_collection(self):
1102 # C BufferedReader objects are collected.
1103 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakab02ceb52018-06-04 06:37:57 +03001104 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou19690592009-06-12 20:14:08 +00001105 rawio = self.FileIO(support.TESTFN, "w+b")
1106 f = self.tp(rawio)
1107 f.f = f
1108 wr = weakref.ref(f)
1109 del f
1110 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03001111 self.assertIsNone(wr(), wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00001112
R David Murray5b2cf5e2013-02-23 22:11:21 -05001113 def test_args_error(self):
1114 # Issue #17275
1115 with self.assertRaisesRegexp(TypeError, "BufferedReader"):
1116 self.tp(io.BytesIO(), 1024, 1024, 1024)
1117
1118
Antoine Pitrou19690592009-06-12 20:14:08 +00001119class PyBufferedReaderTest(BufferedReaderTest):
1120 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001121
1122
Antoine Pitrou19690592009-06-12 20:14:08 +00001123class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1124 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001125
Antoine Pitrou19690592009-06-12 20:14:08 +00001126 def test_constructor(self):
1127 rawio = self.MockRawIO()
1128 bufio = self.tp(rawio)
1129 bufio.__init__(rawio)
1130 bufio.__init__(rawio, buffer_size=1024)
1131 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +00001132 self.assertEqual(3, bufio.write(b"abc"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001133 bufio.flush()
1134 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1135 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1136 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1137 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +00001138 self.assertEqual(3, bufio.write(b"ghi"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001139 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001140 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001141
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001142 def test_uninitialized(self):
1143 bufio = self.tp.__new__(self.tp)
1144 del bufio
1145 bufio = self.tp.__new__(self.tp)
1146 self.assertRaisesRegexp((ValueError, AttributeError),
1147 'uninitialized|has no attribute',
1148 bufio.write, b'')
1149 bufio.__init__(self.MockRawIO())
1150 self.assertEqual(bufio.write(b''), 0)
1151
Antoine Pitrou19690592009-06-12 20:14:08 +00001152 def test_detach_flush(self):
1153 raw = self.MockRawIO()
1154 buf = self.tp(raw)
1155 buf.write(b"howdy!")
1156 self.assertFalse(raw._write_stack)
1157 buf.detach()
1158 self.assertEqual(raw._write_stack, [b"howdy!"])
1159
1160 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001161 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +00001162 writer = self.MockRawIO()
1163 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001164 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001165 self.assertFalse(writer._write_stack)
Martin Panterc9813d82016-06-03 05:59:20 +00001166 buffer = bytearray(b"def")
1167 bufio.write(buffer)
1168 buffer[:] = b"***" # Overwrite our copy of the data
1169 bufio.flush()
1170 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001171
Antoine Pitrou19690592009-06-12 20:14:08 +00001172 def test_write_overflow(self):
1173 writer = self.MockRawIO()
1174 bufio = self.tp(writer, 8)
1175 contents = b"abcdefghijklmnop"
1176 for n in range(0, len(contents), 3):
1177 bufio.write(contents[n:n+3])
1178 flushed = b"".join(writer._write_stack)
1179 # At least (total - 8) bytes were implicitly flushed, perhaps more
1180 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001181 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001182
Antoine Pitrou19690592009-06-12 20:14:08 +00001183 def check_writes(self, intermediate_func):
1184 # Lots of writes, test the flushed output is as expected.
1185 contents = bytes(range(256)) * 1000
1186 n = 0
1187 writer = self.MockRawIO()
1188 bufio = self.tp(writer, 13)
1189 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1190 def gen_sizes():
1191 for size in count(1):
1192 for i in range(15):
1193 yield size
1194 sizes = gen_sizes()
1195 while n < len(contents):
1196 size = min(next(sizes), len(contents) - n)
Ezio Melotti2623a372010-11-21 13:34:58 +00001197 self.assertEqual(bufio.write(contents[n:n+size]), size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001198 intermediate_func(bufio)
1199 n += size
1200 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001201 self.assertEqual(contents,
Antoine Pitrou19690592009-06-12 20:14:08 +00001202 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001203
Antoine Pitrou19690592009-06-12 20:14:08 +00001204 def test_writes(self):
1205 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001206
Antoine Pitrou19690592009-06-12 20:14:08 +00001207 def test_writes_and_flushes(self):
1208 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001209
Antoine Pitrou19690592009-06-12 20:14:08 +00001210 def test_writes_and_seeks(self):
1211 def _seekabs(bufio):
1212 pos = bufio.tell()
1213 bufio.seek(pos + 1, 0)
1214 bufio.seek(pos - 1, 0)
1215 bufio.seek(pos, 0)
1216 self.check_writes(_seekabs)
1217 def _seekrel(bufio):
1218 pos = bufio.seek(0, 1)
1219 bufio.seek(+1, 1)
1220 bufio.seek(-1, 1)
1221 bufio.seek(pos, 0)
1222 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001223
Antoine Pitrou19690592009-06-12 20:14:08 +00001224 def test_writes_and_truncates(self):
1225 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001226
Antoine Pitrou19690592009-06-12 20:14:08 +00001227 def test_write_non_blocking(self):
1228 raw = self.MockNonBlockWriterIO()
1229 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001230
Ezio Melotti2623a372010-11-21 13:34:58 +00001231 self.assertEqual(bufio.write(b"abcd"), 4)
1232 self.assertEqual(bufio.write(b"efghi"), 5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001233 # 1 byte will be written, the rest will be buffered
1234 raw.block_on(b"k")
Ezio Melotti2623a372010-11-21 13:34:58 +00001235 self.assertEqual(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001236
Antoine Pitrou19690592009-06-12 20:14:08 +00001237 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1238 raw.block_on(b"0")
1239 try:
1240 bufio.write(b"opqrwxyz0123456789")
1241 except self.BlockingIOError as e:
1242 written = e.characters_written
1243 else:
1244 self.fail("BlockingIOError should have been raised")
Ezio Melotti2623a372010-11-21 13:34:58 +00001245 self.assertEqual(written, 16)
1246 self.assertEqual(raw.pop_written(),
Antoine Pitrou19690592009-06-12 20:14:08 +00001247 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001248
Ezio Melotti2623a372010-11-21 13:34:58 +00001249 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Antoine Pitrou19690592009-06-12 20:14:08 +00001250 s = raw.pop_written()
1251 # Previously buffered bytes were flushed
1252 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001253
Antoine Pitrou19690592009-06-12 20:14:08 +00001254 def test_write_and_rewind(self):
1255 raw = io.BytesIO()
1256 bufio = self.tp(raw, 4)
1257 self.assertEqual(bufio.write(b"abcdef"), 6)
1258 self.assertEqual(bufio.tell(), 6)
1259 bufio.seek(0, 0)
1260 self.assertEqual(bufio.write(b"XY"), 2)
1261 bufio.seek(6, 0)
1262 self.assertEqual(raw.getvalue(), b"XYcdef")
1263 self.assertEqual(bufio.write(b"123456"), 6)
1264 bufio.flush()
1265 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001266
Antoine Pitrou19690592009-06-12 20:14:08 +00001267 def test_flush(self):
1268 writer = self.MockRawIO()
1269 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001270 bufio.write(b"abc")
1271 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001272 self.assertEqual(b"abc", writer._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001273
Antoine Pitrou78e761e2012-10-16 22:57:11 +02001274 def test_writelines(self):
1275 l = [b'ab', b'cd', b'ef']
1276 writer = self.MockRawIO()
1277 bufio = self.tp(writer, 8)
1278 bufio.writelines(l)
1279 bufio.flush()
1280 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1281
1282 def test_writelines_userlist(self):
1283 l = UserList([b'ab', b'cd', b'ef'])
1284 writer = self.MockRawIO()
1285 bufio = self.tp(writer, 8)
1286 bufio.writelines(l)
1287 bufio.flush()
1288 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1289
1290 def test_writelines_error(self):
1291 writer = self.MockRawIO()
1292 bufio = self.tp(writer, 8)
1293 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1294 self.assertRaises(TypeError, bufio.writelines, None)
1295
Antoine Pitrou19690592009-06-12 20:14:08 +00001296 def test_destructor(self):
1297 writer = self.MockRawIO()
1298 bufio = self.tp(writer, 8)
1299 bufio.write(b"abc")
1300 del bufio
1301 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001302 self.assertEqual(b"abc", writer._write_stack[0])
Antoine Pitrou19690592009-06-12 20:14:08 +00001303
1304 def test_truncate(self):
1305 # Truncate implicitly flushes the buffer.
Serhiy Storchakab02ceb52018-06-04 06:37:57 +03001306 self.addCleanup(support.unlink, support.TESTFN)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001307 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001308 bufio = self.tp(raw, 8)
1309 bufio.write(b"abcdef")
1310 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001311 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001312 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001313 self.assertEqual(f.read(), b"abc")
1314
Victor Stinner6a102812010-04-27 23:55:59 +00001315 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001316 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001317 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001318 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001319 # Write out many bytes from many threads and test they were
1320 # all flushed.
1321 N = 1000
1322 contents = bytes(range(256)) * N
1323 sizes = cycle([1, 19])
1324 n = 0
1325 queue = deque()
1326 while n < len(contents):
1327 size = next(sizes)
1328 queue.append(contents[n:n+size])
1329 n += size
1330 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001331 # We use a real file object because it allows us to
1332 # exercise situations where the GIL is released before
1333 # writing the buffer to the raw streams. This is in addition
1334 # to concurrency issues due to switching threads in the middle
1335 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001336 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001337 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001338 errors = []
1339 def f():
1340 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001341 while True:
1342 try:
1343 s = queue.popleft()
1344 except IndexError:
1345 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001346 bufio.write(s)
1347 except Exception as e:
1348 errors.append(e)
1349 raise
1350 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03001351 with support.start_threads(threads):
1352 time.sleep(0.02) # yield
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001353 self.assertFalse(errors,
1354 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001355 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001356 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001357 s = f.read()
1358 for i in range(256):
Ezio Melotti2623a372010-11-21 13:34:58 +00001359 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001360 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001361 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001362
Antoine Pitrou19690592009-06-12 20:14:08 +00001363 def test_misbehaved_io(self):
1364 rawio = self.MisbehavedRawIO()
1365 bufio = self.tp(rawio, 5)
1366 self.assertRaises(IOError, bufio.seek, 0)
1367 self.assertRaises(IOError, bufio.tell)
1368 self.assertRaises(IOError, bufio.write, b"abcdef")
1369
1370 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001371 with support.check_warnings(("max_buffer_size is deprecated",
1372 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001373 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001374
Benjamin Petersona2d6d712012-12-20 12:24:10 -06001375 def test_write_error_on_close(self):
1376 raw = self.MockRawIO()
1377 def bad_write(b):
1378 raise IOError()
1379 raw.write = bad_write
1380 b = self.tp(raw)
1381 b.write(b'spam')
1382 self.assertRaises(IOError, b.close) # exception not swallowed
1383 self.assertTrue(b.closed)
1384
Antoine Pitrou19690592009-06-12 20:14:08 +00001385
Antoine Pitroubff5df02012-07-29 19:02:46 +02001386class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001387 tp = io.BufferedWriter
1388
1389 def test_constructor(self):
1390 BufferedWriterTest.test_constructor(self)
1391 # The allocation can succeed on 32-bit builds, e.g. with more
1392 # than 2GB RAM and a 64-bit kernel.
1393 if sys.maxsize > 0x7FFFFFFF:
1394 rawio = self.MockRawIO()
1395 bufio = self.tp(rawio)
1396 self.assertRaises((OverflowError, MemoryError, ValueError),
1397 bufio.__init__, rawio, sys.maxsize)
1398
1399 def test_initialization(self):
1400 rawio = self.MockRawIO()
1401 bufio = self.tp(rawio)
1402 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1403 self.assertRaises(ValueError, bufio.write, b"def")
1404 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1405 self.assertRaises(ValueError, bufio.write, b"def")
1406 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1407 self.assertRaises(ValueError, bufio.write, b"def")
1408
1409 def test_garbage_collection(self):
1410 # C BufferedWriter objects are collected, and collecting them flushes
1411 # all data to disk.
1412 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakab02ceb52018-06-04 06:37:57 +03001413 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou19690592009-06-12 20:14:08 +00001414 rawio = self.FileIO(support.TESTFN, "w+b")
1415 f = self.tp(rawio)
1416 f.write(b"123xxx")
1417 f.x = f
1418 wr = weakref.ref(f)
1419 del f
1420 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03001421 self.assertIsNone(wr(), wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001422 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001423 self.assertEqual(f.read(), b"123xxx")
1424
R David Murray5b2cf5e2013-02-23 22:11:21 -05001425 def test_args_error(self):
1426 # Issue #17275
1427 with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
1428 self.tp(io.BytesIO(), 1024, 1024, 1024)
1429
Antoine Pitrou19690592009-06-12 20:14:08 +00001430
1431class PyBufferedWriterTest(BufferedWriterTest):
1432 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001433
1434class BufferedRWPairTest(unittest.TestCase):
1435
Antoine Pitrou19690592009-06-12 20:14:08 +00001436 def test_constructor(self):
1437 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001438 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001439
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001440 def test_uninitialized(self):
1441 pair = self.tp.__new__(self.tp)
1442 del pair
1443 pair = self.tp.__new__(self.tp)
1444 self.assertRaisesRegexp((ValueError, AttributeError),
1445 'uninitialized|has no attribute',
1446 pair.read, 0)
1447 self.assertRaisesRegexp((ValueError, AttributeError),
1448 'uninitialized|has no attribute',
1449 pair.write, b'')
1450 pair.__init__(self.MockRawIO(), self.MockRawIO())
1451 self.assertEqual(pair.read(0), b'')
1452 self.assertEqual(pair.write(b''), 0)
1453
Antoine Pitrou19690592009-06-12 20:14:08 +00001454 def test_detach(self):
1455 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1456 self.assertRaises(self.UnsupportedOperation, pair.detach)
1457
1458 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001459 with support.check_warnings(("max_buffer_size is deprecated",
1460 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001461 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001462
1463 def test_constructor_with_not_readable(self):
1464 class NotReadable(MockRawIO):
1465 def readable(self):
1466 return False
1467
1468 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1469
1470 def test_constructor_with_not_writeable(self):
1471 class NotWriteable(MockRawIO):
1472 def writable(self):
1473 return False
1474
1475 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1476
1477 def test_read(self):
1478 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1479
1480 self.assertEqual(pair.read(3), b"abc")
1481 self.assertEqual(pair.read(1), b"d")
1482 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001483 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1484 self.assertEqual(pair.read(None), b"abc")
1485
1486 def test_readlines(self):
1487 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1488 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1489 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1490 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001491
1492 def test_read1(self):
1493 # .read1() is delegated to the underlying reader object, so this test
1494 # can be shallow.
1495 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1496
1497 self.assertEqual(pair.read1(3), b"abc")
1498
1499 def test_readinto(self):
1500 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1501
Martin Panterc9813d82016-06-03 05:59:20 +00001502 data = byteslike(5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001503 self.assertEqual(pair.readinto(data), 5)
Martin Panterc9813d82016-06-03 05:59:20 +00001504 self.assertEqual(data.tobytes(), b"abcde")
Antoine Pitrou19690592009-06-12 20:14:08 +00001505
1506 def test_write(self):
1507 w = self.MockRawIO()
1508 pair = self.tp(self.MockRawIO(), w)
1509
1510 pair.write(b"abc")
1511 pair.flush()
Martin Panterc9813d82016-06-03 05:59:20 +00001512 buffer = bytearray(b"def")
1513 pair.write(buffer)
1514 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitrou19690592009-06-12 20:14:08 +00001515 pair.flush()
1516 self.assertEqual(w._write_stack, [b"abc", b"def"])
1517
1518 def test_peek(self):
1519 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1520
1521 self.assertTrue(pair.peek(3).startswith(b"abc"))
1522 self.assertEqual(pair.read(3), b"abc")
1523
1524 def test_readable(self):
1525 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1526 self.assertTrue(pair.readable())
1527
1528 def test_writeable(self):
1529 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1530 self.assertTrue(pair.writable())
1531
1532 def test_seekable(self):
1533 # BufferedRWPairs are never seekable, even if their readers and writers
1534 # are.
1535 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1536 self.assertFalse(pair.seekable())
1537
1538 # .flush() is delegated to the underlying writer object and has been
1539 # tested in the test_write method.
1540
1541 def test_close_and_closed(self):
1542 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1543 self.assertFalse(pair.closed)
1544 pair.close()
1545 self.assertTrue(pair.closed)
1546
Serhiy Storchakaf95a57f2015-03-24 23:23:42 +02001547 def test_reader_close_error_on_close(self):
1548 def reader_close():
1549 reader_non_existing
1550 reader = self.MockRawIO()
1551 reader.close = reader_close
1552 writer = self.MockRawIO()
1553 pair = self.tp(reader, writer)
1554 with self.assertRaises(NameError) as err:
1555 pair.close()
1556 self.assertIn('reader_non_existing', str(err.exception))
1557 self.assertTrue(pair.closed)
1558 self.assertFalse(reader.closed)
1559 self.assertTrue(writer.closed)
1560
1561 def test_writer_close_error_on_close(self):
1562 def writer_close():
1563 writer_non_existing
1564 reader = self.MockRawIO()
1565 writer = self.MockRawIO()
1566 writer.close = writer_close
1567 pair = self.tp(reader, writer)
1568 with self.assertRaises(NameError) as err:
1569 pair.close()
1570 self.assertIn('writer_non_existing', str(err.exception))
1571 self.assertFalse(pair.closed)
1572 self.assertTrue(reader.closed)
1573 self.assertFalse(writer.closed)
1574
1575 def test_reader_writer_close_error_on_close(self):
1576 def reader_close():
1577 reader_non_existing
1578 def writer_close():
1579 writer_non_existing
1580 reader = self.MockRawIO()
1581 reader.close = reader_close
1582 writer = self.MockRawIO()
1583 writer.close = writer_close
1584 pair = self.tp(reader, writer)
1585 with self.assertRaises(NameError) as err:
1586 pair.close()
1587 self.assertIn('reader_non_existing', str(err.exception))
1588 self.assertFalse(pair.closed)
1589 self.assertFalse(reader.closed)
1590 self.assertFalse(writer.closed)
1591
Antoine Pitrou19690592009-06-12 20:14:08 +00001592 def test_isatty(self):
1593 class SelectableIsAtty(MockRawIO):
1594 def __init__(self, isatty):
1595 MockRawIO.__init__(self)
1596 self._isatty = isatty
1597
1598 def isatty(self):
1599 return self._isatty
1600
1601 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1602 self.assertFalse(pair.isatty())
1603
1604 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1605 self.assertTrue(pair.isatty())
1606
1607 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1608 self.assertTrue(pair.isatty())
1609
1610 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1611 self.assertTrue(pair.isatty())
1612
Benjamin Peterson1c873bf2014-09-29 22:46:57 -04001613 def test_weakref_clearing(self):
1614 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1615 ref = weakref.ref(brw)
1616 brw = None
1617 ref = None # Shouldn't segfault.
1618
Antoine Pitrou19690592009-06-12 20:14:08 +00001619class CBufferedRWPairTest(BufferedRWPairTest):
1620 tp = io.BufferedRWPair
1621
1622class PyBufferedRWPairTest(BufferedRWPairTest):
1623 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001624
1625
Antoine Pitrou19690592009-06-12 20:14:08 +00001626class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1627 read_mode = "rb+"
1628 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001629
Antoine Pitrou19690592009-06-12 20:14:08 +00001630 def test_constructor(self):
1631 BufferedReaderTest.test_constructor(self)
1632 BufferedWriterTest.test_constructor(self)
1633
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001634 def test_uninitialized(self):
1635 BufferedReaderTest.test_uninitialized(self)
1636 BufferedWriterTest.test_uninitialized(self)
1637
Antoine Pitrou19690592009-06-12 20:14:08 +00001638 def test_read_and_write(self):
1639 raw = self.MockRawIO((b"asdf", b"ghjk"))
1640 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001641
1642 self.assertEqual(b"as", rw.read(2))
1643 rw.write(b"ddd")
1644 rw.write(b"eee")
1645 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001646 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001647 self.assertEqual(b"dddeee", raw._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001648
Antoine Pitrou19690592009-06-12 20:14:08 +00001649 def test_seek_and_tell(self):
1650 raw = self.BytesIO(b"asdfghjkl")
1651 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001652
Ezio Melotti2623a372010-11-21 13:34:58 +00001653 self.assertEqual(b"as", rw.read(2))
1654 self.assertEqual(2, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001655 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001656 self.assertEqual(b"asdf", rw.read(4))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001657
Antoine Pitrou808cec52011-08-20 15:40:58 +02001658 rw.write(b"123f")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001659 rw.seek(0, 0)
Antoine Pitrou808cec52011-08-20 15:40:58 +02001660 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001661 self.assertEqual(9, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001662 rw.seek(-4, 2)
Ezio Melotti2623a372010-11-21 13:34:58 +00001663 self.assertEqual(5, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001664 rw.seek(2, 1)
Ezio Melotti2623a372010-11-21 13:34:58 +00001665 self.assertEqual(7, rw.tell())
1666 self.assertEqual(b"fl", rw.read(11))
Antoine Pitrou808cec52011-08-20 15:40:58 +02001667 rw.flush()
1668 self.assertEqual(b"asdf123fl", raw.getvalue())
1669
Christian Heimes1a6387e2008-03-26 12:49:49 +00001670 self.assertRaises(TypeError, rw.seek, 0.0)
1671
Antoine Pitrou19690592009-06-12 20:14:08 +00001672 def check_flush_and_read(self, read_func):
1673 raw = self.BytesIO(b"abcdefghi")
1674 bufio = self.tp(raw)
1675
Ezio Melotti2623a372010-11-21 13:34:58 +00001676 self.assertEqual(b"ab", read_func(bufio, 2))
Antoine Pitrou19690592009-06-12 20:14:08 +00001677 bufio.write(b"12")
Ezio Melotti2623a372010-11-21 13:34:58 +00001678 self.assertEqual(b"ef", read_func(bufio, 2))
1679 self.assertEqual(6, bufio.tell())
Antoine Pitrou19690592009-06-12 20:14:08 +00001680 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001681 self.assertEqual(6, bufio.tell())
1682 self.assertEqual(b"ghi", read_func(bufio))
Antoine Pitrou19690592009-06-12 20:14:08 +00001683 raw.seek(0, 0)
1684 raw.write(b"XYZ")
1685 # flush() resets the read buffer
1686 bufio.flush()
1687 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001688 self.assertEqual(b"XYZ", read_func(bufio, 3))
Antoine Pitrou19690592009-06-12 20:14:08 +00001689
1690 def test_flush_and_read(self):
1691 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1692
1693 def test_flush_and_readinto(self):
1694 def _readinto(bufio, n=-1):
1695 b = bytearray(n if n >= 0 else 9999)
1696 n = bufio.readinto(b)
1697 return bytes(b[:n])
1698 self.check_flush_and_read(_readinto)
1699
1700 def test_flush_and_peek(self):
1701 def _peek(bufio, n=-1):
1702 # This relies on the fact that the buffer can contain the whole
1703 # raw stream, otherwise peek() can return less.
1704 b = bufio.peek(n)
1705 if n != -1:
1706 b = b[:n]
1707 bufio.seek(len(b), 1)
1708 return b
1709 self.check_flush_and_read(_peek)
1710
1711 def test_flush_and_write(self):
1712 raw = self.BytesIO(b"abcdefghi")
1713 bufio = self.tp(raw)
1714
1715 bufio.write(b"123")
1716 bufio.flush()
1717 bufio.write(b"45")
1718 bufio.flush()
1719 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001720 self.assertEqual(b"12345fghi", raw.getvalue())
1721 self.assertEqual(b"12345fghi", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +00001722
1723 def test_threads(self):
1724 BufferedReaderTest.test_threads(self)
1725 BufferedWriterTest.test_threads(self)
1726
1727 def test_writes_and_peek(self):
1728 def _peek(bufio):
1729 bufio.peek(1)
1730 self.check_writes(_peek)
1731 def _peek(bufio):
1732 pos = bufio.tell()
1733 bufio.seek(-1, 1)
1734 bufio.peek(1)
1735 bufio.seek(pos, 0)
1736 self.check_writes(_peek)
1737
1738 def test_writes_and_reads(self):
1739 def _read(bufio):
1740 bufio.seek(-1, 1)
1741 bufio.read(1)
1742 self.check_writes(_read)
1743
1744 def test_writes_and_read1s(self):
1745 def _read1(bufio):
1746 bufio.seek(-1, 1)
1747 bufio.read1(1)
1748 self.check_writes(_read1)
1749
1750 def test_writes_and_readintos(self):
1751 def _read(bufio):
1752 bufio.seek(-1, 1)
1753 bufio.readinto(bytearray(1))
1754 self.check_writes(_read)
1755
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001756 def test_write_after_readahead(self):
1757 # Issue #6629: writing after the buffer was filled by readahead should
1758 # first rewind the raw stream.
1759 for overwrite_size in [1, 5]:
1760 raw = self.BytesIO(b"A" * 10)
1761 bufio = self.tp(raw, 4)
1762 # Trigger readahead
1763 self.assertEqual(bufio.read(1), b"A")
1764 self.assertEqual(bufio.tell(), 1)
1765 # Overwriting should rewind the raw stream if it needs so
1766 bufio.write(b"B" * overwrite_size)
1767 self.assertEqual(bufio.tell(), overwrite_size + 1)
1768 # If the write size was smaller than the buffer size, flush() and
1769 # check that rewind happens.
1770 bufio.flush()
1771 self.assertEqual(bufio.tell(), overwrite_size + 1)
1772 s = raw.getvalue()
1773 self.assertEqual(s,
1774 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1775
Antoine Pitrouee46a7b2011-05-13 00:31:52 +02001776 def test_write_rewind_write(self):
1777 # Various combinations of reading / writing / seeking backwards / writing again
1778 def mutate(bufio, pos1, pos2):
1779 assert pos2 >= pos1
1780 # Fill the buffer
1781 bufio.seek(pos1)
1782 bufio.read(pos2 - pos1)
1783 bufio.write(b'\x02')
1784 # This writes earlier than the previous write, but still inside
1785 # the buffer.
1786 bufio.seek(pos1)
1787 bufio.write(b'\x01')
1788
1789 b = b"\x80\x81\x82\x83\x84"
1790 for i in range(0, len(b)):
1791 for j in range(i, len(b)):
1792 raw = self.BytesIO(b)
1793 bufio = self.tp(raw, 100)
1794 mutate(bufio, i, j)
1795 bufio.flush()
1796 expected = bytearray(b)
1797 expected[j] = 2
1798 expected[i] = 1
1799 self.assertEqual(raw.getvalue(), expected,
1800 "failed result for i=%d, j=%d" % (i, j))
1801
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001802 def test_truncate_after_read_or_write(self):
1803 raw = self.BytesIO(b"A" * 10)
1804 bufio = self.tp(raw, 100)
1805 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1806 self.assertEqual(bufio.truncate(), 2)
1807 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1808 self.assertEqual(bufio.truncate(), 4)
1809
Antoine Pitrou19690592009-06-12 20:14:08 +00001810 def test_misbehaved_io(self):
1811 BufferedReaderTest.test_misbehaved_io(self)
1812 BufferedWriterTest.test_misbehaved_io(self)
1813
Antoine Pitrou808cec52011-08-20 15:40:58 +02001814 def test_interleaved_read_write(self):
1815 # Test for issue #12213
1816 with self.BytesIO(b'abcdefgh') as raw:
1817 with self.tp(raw, 100) as f:
1818 f.write(b"1")
1819 self.assertEqual(f.read(1), b'b')
1820 f.write(b'2')
1821 self.assertEqual(f.read1(1), b'd')
1822 f.write(b'3')
1823 buf = bytearray(1)
1824 f.readinto(buf)
1825 self.assertEqual(buf, b'f')
1826 f.write(b'4')
1827 self.assertEqual(f.peek(1), b'h')
1828 f.flush()
1829 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1830
1831 with self.BytesIO(b'abc') as raw:
1832 with self.tp(raw, 100) as f:
1833 self.assertEqual(f.read(1), b'a')
1834 f.write(b"2")
1835 self.assertEqual(f.read(1), b'c')
1836 f.flush()
1837 self.assertEqual(raw.getvalue(), b'a2c')
1838
1839 def test_interleaved_readline_write(self):
1840 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1841 with self.tp(raw) as f:
1842 f.write(b'1')
1843 self.assertEqual(f.readline(), b'b\n')
1844 f.write(b'2')
1845 self.assertEqual(f.readline(), b'def\n')
1846 f.write(b'3')
1847 self.assertEqual(f.readline(), b'\n')
1848 f.flush()
1849 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1850
R David Murray5b2cf5e2013-02-23 22:11:21 -05001851
Antoine Pitroubff5df02012-07-29 19:02:46 +02001852class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
1853 BufferedRandomTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001854 tp = io.BufferedRandom
1855
1856 def test_constructor(self):
1857 BufferedRandomTest.test_constructor(self)
1858 # The allocation can succeed on 32-bit builds, e.g. with more
1859 # than 2GB RAM and a 64-bit kernel.
1860 if sys.maxsize > 0x7FFFFFFF:
1861 rawio = self.MockRawIO()
1862 bufio = self.tp(rawio)
1863 self.assertRaises((OverflowError, MemoryError, ValueError),
1864 bufio.__init__, rawio, sys.maxsize)
1865
1866 def test_garbage_collection(self):
1867 CBufferedReaderTest.test_garbage_collection(self)
1868 CBufferedWriterTest.test_garbage_collection(self)
1869
R David Murray5b2cf5e2013-02-23 22:11:21 -05001870 def test_args_error(self):
1871 # Issue #17275
1872 with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
1873 self.tp(io.BytesIO(), 1024, 1024, 1024)
1874
1875
Antoine Pitrou19690592009-06-12 20:14:08 +00001876class PyBufferedRandomTest(BufferedRandomTest):
1877 tp = pyio.BufferedRandom
1878
1879
Christian Heimes1a6387e2008-03-26 12:49:49 +00001880# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1881# properties:
1882# - A single output character can correspond to many bytes of input.
1883# - The number of input bytes to complete the character can be
1884# undetermined until the last input byte is received.
1885# - The number of input bytes can vary depending on previous input.
1886# - A single input byte can correspond to many characters of output.
1887# - The number of output characters can be undetermined until the
1888# last input byte is received.
1889# - The number of output characters can vary depending on previous input.
1890
1891class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1892 """
1893 For testing seek/tell behavior with a stateful, buffering decoder.
1894
1895 Input is a sequence of words. Words may be fixed-length (length set
1896 by input) or variable-length (period-terminated). In variable-length
1897 mode, extra periods are ignored. Possible words are:
1898 - 'i' followed by a number sets the input length, I (maximum 99).
1899 When I is set to 0, words are space-terminated.
1900 - 'o' followed by a number sets the output length, O (maximum 99).
1901 - Any other word is converted into a word followed by a period on
1902 the output. The output word consists of the input word truncated
1903 or padded out with hyphens to make its length equal to O. If O
1904 is 0, the word is output verbatim without truncating or padding.
1905 I and O are initially set to 1. When I changes, any buffered input is
1906 re-scanned according to the new I. EOF also terminates the last word.
1907 """
1908
1909 def __init__(self, errors='strict'):
1910 codecs.IncrementalDecoder.__init__(self, errors)
1911 self.reset()
1912
1913 def __repr__(self):
1914 return '<SID %x>' % id(self)
1915
1916 def reset(self):
1917 self.i = 1
1918 self.o = 1
1919 self.buffer = bytearray()
1920
1921 def getstate(self):
1922 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1923 return bytes(self.buffer), i*100 + o
1924
1925 def setstate(self, state):
1926 buffer, io = state
1927 self.buffer = bytearray(buffer)
1928 i, o = divmod(io, 100)
1929 self.i, self.o = i ^ 1, o ^ 1
1930
1931 def decode(self, input, final=False):
1932 output = ''
1933 for b in input:
1934 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001935 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001936 if self.buffer:
1937 output += self.process_word()
1938 else:
1939 self.buffer.append(b)
1940 else: # fixed-length, terminate after self.i bytes
1941 self.buffer.append(b)
1942 if len(self.buffer) == self.i:
1943 output += self.process_word()
1944 if final and self.buffer: # EOF terminates the last word
1945 output += self.process_word()
1946 return output
1947
1948 def process_word(self):
1949 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001950 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001951 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001952 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001953 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1954 else:
1955 output = self.buffer.decode('ascii')
1956 if len(output) < self.o:
1957 output += '-'*self.o # pad out with hyphens
1958 if self.o:
1959 output = output[:self.o] # truncate to output length
1960 output += '.'
1961 self.buffer = bytearray()
1962 return output
1963
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001964 codecEnabled = False
1965
1966 @classmethod
1967 def lookupTestDecoder(cls, name):
1968 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001969 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001970 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001971 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001972 incrementalencoder=None,
1973 streamreader=None, streamwriter=None,
1974 incrementaldecoder=cls)
1975
1976# Register the previous decoder for testing.
1977# Disabled by default, tests will enable it.
1978codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1979
1980
Christian Heimes1a6387e2008-03-26 12:49:49 +00001981class StatefulIncrementalDecoderTest(unittest.TestCase):
1982 """
1983 Make sure the StatefulIncrementalDecoder actually works.
1984 """
1985
1986 test_cases = [
1987 # I=1, O=1 (fixed-length input == fixed-length output)
1988 (b'abcd', False, 'a.b.c.d.'),
1989 # I=0, O=0 (variable-length input, variable-length output)
1990 (b'oiabcd', True, 'abcd.'),
1991 # I=0, O=0 (should ignore extra periods)
1992 (b'oi...abcd...', True, 'abcd.'),
1993 # I=0, O=6 (variable-length input, fixed-length output)
1994 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1995 # I=2, O=6 (fixed-length input < fixed-length output)
1996 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1997 # I=6, O=3 (fixed-length input > fixed-length output)
1998 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1999 # I=0, then 3; O=29, then 15 (with longer output)
2000 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2001 'a----------------------------.' +
2002 'b----------------------------.' +
2003 'cde--------------------------.' +
2004 'abcdefghijabcde.' +
2005 'a.b------------.' +
2006 '.c.------------.' +
2007 'd.e------------.' +
2008 'k--------------.' +
2009 'l--------------.' +
2010 'm--------------.')
2011 ]
2012
Antoine Pitrou19690592009-06-12 20:14:08 +00002013 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002014 # Try a few one-shot test cases.
2015 for input, eof, output in self.test_cases:
2016 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00002017 self.assertEqual(d.decode(input, eof), output)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002018
2019 # Also test an unfinished decode, followed by forcing EOF.
2020 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00002021 self.assertEqual(d.decode(b'oiabcd'), '')
2022 self.assertEqual(d.decode(b'', 1), 'abcd.')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002023
2024class TextIOWrapperTest(unittest.TestCase):
2025
2026 def setUp(self):
2027 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2028 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00002029 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002030
2031 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002032 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002033
Antoine Pitrou19690592009-06-12 20:14:08 +00002034 def test_constructor(self):
2035 r = self.BytesIO(b"\xc3\xa9\n\n")
2036 b = self.BufferedReader(r, 1000)
2037 t = self.TextIOWrapper(b)
2038 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti2623a372010-11-21 13:34:58 +00002039 self.assertEqual(t.encoding, "latin1")
2040 self.assertEqual(t.line_buffering, False)
Antoine Pitrou19690592009-06-12 20:14:08 +00002041 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti2623a372010-11-21 13:34:58 +00002042 self.assertEqual(t.encoding, "utf8")
2043 self.assertEqual(t.line_buffering, True)
2044 self.assertEqual("\xe9\n", t.readline())
Antoine Pitrou19690592009-06-12 20:14:08 +00002045 self.assertRaises(TypeError, t.__init__, b, newline=42)
2046 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2047
Serhiy Storchakaabb7e652015-04-16 11:56:35 +03002048 def test_uninitialized(self):
2049 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2050 del t
2051 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2052 self.assertRaises(Exception, repr, t)
2053 self.assertRaisesRegexp((ValueError, AttributeError),
2054 'uninitialized|has no attribute',
2055 t.read, 0)
2056 t.__init__(self.MockRawIO())
2057 self.assertEqual(t.read(0), u'')
2058
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002059 def test_non_text_encoding_codecs_are_rejected(self):
2060 # Ensure the constructor complains if passed a codec that isn't
2061 # marked as a text encoding
2062 # http://bugs.python.org/issue20404
2063 r = self.BytesIO()
2064 b = self.BufferedWriter(r)
2065 with support.check_py3k_warnings():
2066 self.TextIOWrapper(b, encoding="hex_codec")
2067
Antoine Pitrou19690592009-06-12 20:14:08 +00002068 def test_detach(self):
2069 r = self.BytesIO()
2070 b = self.BufferedWriter(r)
2071 t = self.TextIOWrapper(b)
2072 self.assertIs(t.detach(), b)
2073
2074 t = self.TextIOWrapper(b, encoding="ascii")
2075 t.write("howdy")
2076 self.assertFalse(r.getvalue())
2077 t.detach()
2078 self.assertEqual(r.getvalue(), b"howdy")
2079 self.assertRaises(ValueError, t.detach)
2080
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002081 # Operations independent of the detached stream should still work
2082 repr(t)
2083 self.assertEqual(t.encoding, "ascii")
2084 self.assertEqual(t.errors, "strict")
2085 self.assertFalse(t.line_buffering)
2086
Antoine Pitrou19690592009-06-12 20:14:08 +00002087 def test_repr(self):
2088 raw = self.BytesIO("hello".encode("utf-8"))
2089 b = self.BufferedReader(raw)
2090 t = self.TextIOWrapper(b, encoding="utf-8")
2091 modname = self.TextIOWrapper.__module__
2092 self.assertEqual(repr(t),
2093 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2094 raw.name = "dummy"
2095 self.assertEqual(repr(t),
2096 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
2097 raw.name = b"dummy"
2098 self.assertEqual(repr(t),
2099 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
2100
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002101 t.buffer.detach()
2102 repr(t) # Should not raise an exception
2103
Antoine Pitrou19690592009-06-12 20:14:08 +00002104 def test_line_buffering(self):
2105 r = self.BytesIO()
2106 b = self.BufferedWriter(r, 1000)
2107 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
2108 t.write("X")
Ezio Melotti2623a372010-11-21 13:34:58 +00002109 self.assertEqual(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00002110 t.write("Y\nZ")
Ezio Melotti2623a372010-11-21 13:34:58 +00002111 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00002112 t.write("A\rB")
Ezio Melotti2623a372010-11-21 13:34:58 +00002113 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002114
Antoine Pitrou19690592009-06-12 20:14:08 +00002115 def test_encoding(self):
2116 # Check the encoding attribute is always set, and valid
2117 b = self.BytesIO()
2118 t = self.TextIOWrapper(b, encoding="utf8")
2119 self.assertEqual(t.encoding, "utf8")
2120 t = self.TextIOWrapper(b)
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03002121 self.assertIsNotNone(t.encoding)
Antoine Pitrou19690592009-06-12 20:14:08 +00002122 codecs.lookup(t.encoding)
2123
2124 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002125 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00002126 b = self.BytesIO(b"abc\n\xff\n")
2127 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002128 self.assertRaises(UnicodeError, t.read)
2129 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00002130 b = self.BytesIO(b"abc\n\xff\n")
2131 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002132 self.assertRaises(UnicodeError, t.read)
2133 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00002134 b = self.BytesIO(b"abc\n\xff\n")
2135 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti2623a372010-11-21 13:34:58 +00002136 self.assertEqual(t.read(), "abc\n\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002137 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00002138 b = self.BytesIO(b"abc\n\xff\n")
2139 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti2623a372010-11-21 13:34:58 +00002140 self.assertEqual(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002141
Antoine Pitrou19690592009-06-12 20:14:08 +00002142 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002143 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00002144 b = self.BytesIO()
2145 t = self.TextIOWrapper(b, encoding="ascii")
2146 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002147 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00002148 b = self.BytesIO()
2149 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2150 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002151 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00002152 b = self.BytesIO()
2153 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00002154 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00002155 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002156 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002157 self.assertEqual(b.getvalue(), b"abcdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002158 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00002159 b = self.BytesIO()
2160 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00002161 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00002162 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002163 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002164 self.assertEqual(b.getvalue(), b"abc?def\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002165
Antoine Pitrou19690592009-06-12 20:14:08 +00002166 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002167 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2168
2169 tests = [
2170 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
2171 [ '', input_lines ],
2172 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2173 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2174 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
2175 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002176 encodings = (
2177 'utf-8', 'latin-1',
2178 'utf-16', 'utf-16-le', 'utf-16-be',
2179 'utf-32', 'utf-32-le', 'utf-32-be',
2180 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00002181
2182 # Try a range of buffer sizes to test the case where \r is the last
2183 # character in TextIOWrapper._pending_line.
2184 for encoding in encodings:
2185 # XXX: str.encode() should return bytes
2186 data = bytes(''.join(input_lines).encode(encoding))
2187 for do_reads in (False, True):
2188 for bufsize in range(1, 10):
2189 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002190 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2191 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00002192 encoding=encoding)
2193 if do_reads:
2194 got_lines = []
2195 while True:
2196 c2 = textio.read(2)
2197 if c2 == '':
2198 break
Ezio Melotti2623a372010-11-21 13:34:58 +00002199 self.assertEqual(len(c2), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002200 got_lines.append(c2 + textio.readline())
2201 else:
2202 got_lines = list(textio)
2203
2204 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti2623a372010-11-21 13:34:58 +00002205 self.assertEqual(got_line, exp_line)
2206 self.assertEqual(len(got_lines), len(exp_lines))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002207
Antoine Pitrou19690592009-06-12 20:14:08 +00002208 def test_newlines_input(self):
2209 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002210 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2211 for newline, expected in [
2212 (None, normalized.decode("ascii").splitlines(True)),
2213 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00002214 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2215 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2216 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00002217 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00002218 buf = self.BytesIO(testdata)
2219 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti2623a372010-11-21 13:34:58 +00002220 self.assertEqual(txt.readlines(), expected)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002221 txt.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002222 self.assertEqual(txt.read(), "".join(expected))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002223
Antoine Pitrou19690592009-06-12 20:14:08 +00002224 def test_newlines_output(self):
2225 testdict = {
2226 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2227 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2228 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2229 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2230 }
2231 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2232 for newline, expected in tests:
2233 buf = self.BytesIO()
2234 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2235 txt.write("AAA\nB")
2236 txt.write("BB\nCCC\n")
2237 txt.write("X\rY\r\nZ")
2238 txt.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002239 self.assertEqual(buf.closed, False)
2240 self.assertEqual(buf.getvalue(), expected)
Antoine Pitrou19690592009-06-12 20:14:08 +00002241
2242 def test_destructor(self):
2243 l = []
2244 base = self.BytesIO
2245 class MyBytesIO(base):
2246 def close(self):
2247 l.append(self.getvalue())
2248 base.close(self)
2249 b = MyBytesIO()
2250 t = self.TextIOWrapper(b, encoding="ascii")
2251 t.write("abc")
2252 del t
2253 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00002254 self.assertEqual([b"abc"], l)
Antoine Pitrou19690592009-06-12 20:14:08 +00002255
2256 def test_override_destructor(self):
2257 record = []
2258 class MyTextIO(self.TextIOWrapper):
2259 def __del__(self):
2260 record.append(1)
2261 try:
2262 f = super(MyTextIO, self).__del__
2263 except AttributeError:
2264 pass
2265 else:
2266 f()
2267 def close(self):
2268 record.append(2)
2269 super(MyTextIO, self).close()
2270 def flush(self):
2271 record.append(3)
2272 super(MyTextIO, self).flush()
2273 b = self.BytesIO()
2274 t = MyTextIO(b, encoding="ascii")
2275 del t
2276 support.gc_collect()
2277 self.assertEqual(record, [1, 2, 3])
2278
2279 def test_error_through_destructor(self):
2280 # Test that the exception state is not modified by a destructor,
2281 # even if close() fails.
2282 rawio = self.CloseFailureIO()
2283 def f():
2284 self.TextIOWrapper(rawio).xyzzy
2285 with support.captured_output("stderr") as s:
2286 self.assertRaises(AttributeError, f)
2287 s = s.getvalue().strip()
2288 if s:
2289 # The destructor *may* have printed an unraisable error, check it
2290 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002291 self.assertTrue(s.startswith("Exception IOError: "), s)
2292 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002293
2294 # Systematic tests of the text I/O API
2295
Antoine Pitrou19690592009-06-12 20:14:08 +00002296 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002297 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2298 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00002299 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002300 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002301 self.assertEqual(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002302 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002303 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002304 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002305 self.assertEqual(f.tell(), 0)
2306 self.assertEqual(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002307 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002308 self.assertEqual(f.seek(0), 0)
2309 self.assertEqual(f.read(None), "abc")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002310 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002311 self.assertEqual(f.read(2), "ab")
2312 self.assertEqual(f.read(1), "c")
2313 self.assertEqual(f.read(1), "")
2314 self.assertEqual(f.read(), "")
2315 self.assertEqual(f.tell(), cookie)
2316 self.assertEqual(f.seek(0), 0)
2317 self.assertEqual(f.seek(0, 2), cookie)
2318 self.assertEqual(f.write("def"), 3)
2319 self.assertEqual(f.seek(cookie), cookie)
2320 self.assertEqual(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002321 if enc.startswith("utf"):
2322 self.multi_line_test(f, enc)
2323 f.close()
2324
2325 def multi_line_test(self, f, enc):
2326 f.seek(0)
2327 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00002328 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002329 wlines = []
2330 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2331 chars = []
2332 for i in range(size):
2333 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00002334 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002335 wlines.append((f.tell(), line))
2336 f.write(line)
2337 f.seek(0)
2338 rlines = []
2339 while True:
2340 pos = f.tell()
2341 line = f.readline()
2342 if not line:
2343 break
2344 rlines.append((pos, line))
Ezio Melotti2623a372010-11-21 13:34:58 +00002345 self.assertEqual(rlines, wlines)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002346
Antoine Pitrou19690592009-06-12 20:14:08 +00002347 def test_telling(self):
2348 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002349 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002350 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002351 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002352 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002353 p2 = f.tell()
2354 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002355 self.assertEqual(f.tell(), p0)
2356 self.assertEqual(f.readline(), "\xff\n")
2357 self.assertEqual(f.tell(), p1)
2358 self.assertEqual(f.readline(), "\xff\n")
2359 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002360 f.seek(0)
2361 for line in f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002362 self.assertEqual(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002363 self.assertRaises(IOError, f.tell)
Ezio Melotti2623a372010-11-21 13:34:58 +00002364 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002365 f.close()
2366
Antoine Pitrou19690592009-06-12 20:14:08 +00002367 def test_seeking(self):
2368 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00002369 prefix_size = chunk_size - 2
2370 u_prefix = "a" * prefix_size
2371 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti2623a372010-11-21 13:34:58 +00002372 self.assertEqual(len(u_prefix), len(prefix))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002373 u_suffix = "\u8888\n"
2374 suffix = bytes(u_suffix.encode("utf-8"))
2375 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00002376 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002377 f.write(line*2)
2378 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002379 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002380 s = f.read(prefix_size)
Ezio Melotti2623a372010-11-21 13:34:58 +00002381 self.assertEqual(s, prefix.decode("ascii"))
2382 self.assertEqual(f.tell(), prefix_size)
2383 self.assertEqual(f.readline(), u_suffix)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002384
Antoine Pitrou19690592009-06-12 20:14:08 +00002385 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002386 # Regression test for a specific bug
2387 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00002388 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002389 f.write(data)
2390 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002391 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002392 f._CHUNK_SIZE # Just test that it exists
2393 f._CHUNK_SIZE = 2
2394 f.readline()
2395 f.tell()
2396
Antoine Pitrou19690592009-06-12 20:14:08 +00002397 def test_seek_and_tell(self):
2398 #Test seek/tell using the StatefulIncrementalDecoder.
2399 # Make test faster by doing smaller seeks
2400 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00002401
Antoine Pitrou19690592009-06-12 20:14:08 +00002402 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002403 """Tell/seek to various points within a data stream and ensure
2404 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00002405 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002406 f.write(data)
2407 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002408 f = self.open(support.TESTFN, encoding='test_decoder')
2409 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00002410 decoded = f.read()
2411 f.close()
2412
2413 for i in range(min_pos, len(decoded) + 1): # seek positions
2414 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00002415 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti2623a372010-11-21 13:34:58 +00002416 self.assertEqual(f.read(i), decoded[:i])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002417 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002418 self.assertEqual(f.read(j), decoded[i:i + j])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002419 f.seek(cookie)
Ezio Melotti2623a372010-11-21 13:34:58 +00002420 self.assertEqual(f.read(), decoded[i:])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002421 f.close()
2422
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002423 # Enable the test decoder.
2424 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002425
2426 # Run the tests.
2427 try:
2428 # Try each test case.
2429 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002430 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002431
2432 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002433 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2434 offset = CHUNK_SIZE - len(input)//2
2435 prefix = b'.'*offset
2436 # Don't bother seeking into the prefix (takes too long).
2437 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002438 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002439
2440 # Ensure our test decoder won't interfere with subsequent tests.
2441 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002442 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002443
Antoine Pitrou19690592009-06-12 20:14:08 +00002444 def test_encoded_writes(self):
2445 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002446 tests = ("utf-16",
2447 "utf-16-le",
2448 "utf-16-be",
2449 "utf-32",
2450 "utf-32-le",
2451 "utf-32-be")
2452 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002453 buf = self.BytesIO()
2454 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002455 # Check if the BOM is written only once (see issue1753).
2456 f.write(data)
2457 f.write(data)
2458 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002459 self.assertEqual(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002460 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002461 self.assertEqual(f.read(), data * 2)
2462 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002463
Antoine Pitrou19690592009-06-12 20:14:08 +00002464 def test_unreadable(self):
2465 class UnReadable(self.BytesIO):
2466 def readable(self):
2467 return False
2468 txt = self.TextIOWrapper(UnReadable())
2469 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002470
Antoine Pitrou19690592009-06-12 20:14:08 +00002471 def test_read_one_by_one(self):
2472 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002473 reads = ""
2474 while True:
2475 c = txt.read(1)
2476 if not c:
2477 break
2478 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002479 self.assertEqual(reads, "AA\nBB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002480
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002481 def test_readlines(self):
2482 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2483 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2484 txt.seek(0)
2485 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2486 txt.seek(0)
2487 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2488
Christian Heimes1a6387e2008-03-26 12:49:49 +00002489 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002490 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002491 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002492 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002493 reads = ""
2494 while True:
2495 c = txt.read(128)
2496 if not c:
2497 break
2498 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002499 self.assertEqual(reads, "A"*127+"\nB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002500
Antoine Pitroueadca1d2012-10-16 23:02:27 +02002501 def test_writelines(self):
2502 l = ['ab', 'cd', 'ef']
2503 buf = self.BytesIO()
2504 txt = self.TextIOWrapper(buf)
2505 txt.writelines(l)
2506 txt.flush()
2507 self.assertEqual(buf.getvalue(), b'abcdef')
2508
2509 def test_writelines_userlist(self):
2510 l = UserList(['ab', 'cd', 'ef'])
2511 buf = self.BytesIO()
2512 txt = self.TextIOWrapper(buf)
2513 txt.writelines(l)
2514 txt.flush()
2515 self.assertEqual(buf.getvalue(), b'abcdef')
2516
2517 def test_writelines_error(self):
2518 txt = self.TextIOWrapper(self.BytesIO())
2519 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2520 self.assertRaises(TypeError, txt.writelines, None)
2521 self.assertRaises(TypeError, txt.writelines, b'abc')
2522
Christian Heimes1a6387e2008-03-26 12:49:49 +00002523 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002524 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002525
2526 # read one char at a time
2527 reads = ""
2528 while True:
2529 c = txt.read(1)
2530 if not c:
2531 break
2532 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002533 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002534
2535 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002536 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002537 txt._CHUNK_SIZE = 4
2538
2539 reads = ""
2540 while True:
2541 c = txt.read(4)
2542 if not c:
2543 break
2544 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002545 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002546
2547 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002548 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002549 txt._CHUNK_SIZE = 4
2550
2551 reads = txt.read(4)
2552 reads += txt.read(4)
2553 reads += txt.readline()
2554 reads += txt.readline()
2555 reads += txt.readline()
Ezio Melotti2623a372010-11-21 13:34:58 +00002556 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002557
2558 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002559 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002560 txt._CHUNK_SIZE = 4
2561
2562 reads = txt.read(4)
2563 reads += txt.read()
Ezio Melotti2623a372010-11-21 13:34:58 +00002564 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002565
2566 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002567 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002568 txt._CHUNK_SIZE = 4
2569
2570 reads = txt.read(4)
2571 pos = txt.tell()
2572 txt.seek(0)
2573 txt.seek(pos)
Ezio Melotti2623a372010-11-21 13:34:58 +00002574 self.assertEqual(txt.read(4), "BBB\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002575
2576 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002577 buffer = self.BytesIO(self.testdata)
2578 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002579
2580 self.assertEqual(buffer.seekable(), txt.seekable())
2581
Antoine Pitrou19690592009-06-12 20:14:08 +00002582 def test_append_bom(self):
2583 # The BOM is not written again when appending to a non-empty file
2584 filename = support.TESTFN
2585 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2586 with self.open(filename, 'w', encoding=charset) as f:
2587 f.write('aaa')
2588 pos = f.tell()
2589 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002590 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002591
2592 with self.open(filename, 'a', encoding=charset) as f:
2593 f.write('xxx')
2594 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002595 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002596
Antoine Pitrou19690592009-06-12 20:14:08 +00002597 def test_seek_bom(self):
2598 # Same test, but when seeking manually
2599 filename = support.TESTFN
2600 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2601 with self.open(filename, 'w', encoding=charset) as f:
2602 f.write('aaa')
2603 pos = f.tell()
2604 with self.open(filename, 'r+', encoding=charset) as f:
2605 f.seek(pos)
2606 f.write('zzz')
2607 f.seek(0)
2608 f.write('bbb')
2609 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002610 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002611
2612 def test_errors_property(self):
2613 with self.open(support.TESTFN, "w") as f:
2614 self.assertEqual(f.errors, "strict")
2615 with self.open(support.TESTFN, "w", errors="replace") as f:
2616 self.assertEqual(f.errors, "replace")
2617
Victor Stinner6a102812010-04-27 23:55:59 +00002618 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002619 def test_threads_write(self):
2620 # Issue6750: concurrent writes could duplicate data
2621 event = threading.Event()
2622 with self.open(support.TESTFN, "w", buffering=1) as f:
2623 def run(n):
2624 text = "Thread%03d\n" % n
2625 event.wait()
2626 f.write(text)
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03002627 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002628 for x in range(20)]
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03002629 with support.start_threads(threads, event.set):
2630 time.sleep(0.02)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002631 with self.open(support.TESTFN) as f:
2632 content = f.read()
2633 for n in range(20):
Ezio Melotti2623a372010-11-21 13:34:58 +00002634 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002635
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002636 def test_flush_error_on_close(self):
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002637 # Test that text file is closed despite failed flush
2638 # and that flush() is called before file closed.
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002639 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002640 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002641 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002642 closed[:] = [txt.closed, txt.buffer.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002643 raise IOError()
2644 txt.flush = bad_flush
2645 self.assertRaises(IOError, txt.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -06002646 self.assertTrue(txt.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002647 self.assertTrue(txt.buffer.closed)
2648 self.assertTrue(closed) # flush() called
2649 self.assertFalse(closed[0]) # flush() called before file closed
2650 self.assertFalse(closed[1])
Serhiy Storchaka437d5352015-02-23 00:28:38 +02002651 txt.flush = lambda: None # break reference loop
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002652
2653 def test_multi_close(self):
2654 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2655 txt.close()
2656 txt.close()
2657 txt.close()
2658 self.assertRaises(ValueError, txt.flush)
2659
Antoine Pitroufc9ead62010-12-21 21:26:55 +00002660 def test_readonly_attributes(self):
2661 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2662 buf = self.BytesIO(self.testdata)
2663 with self.assertRaises((AttributeError, TypeError)):
2664 txt.buffer = buf
2665
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002666 def test_read_nonbytes(self):
2667 # Issue #17106
2668 # Crash when underlying read() returns non-bytes
2669 class NonbytesStream(self.StringIO):
2670 read1 = self.StringIO.read
2671 class NonbytesStream(self.StringIO):
2672 read1 = self.StringIO.read
2673 t = self.TextIOWrapper(NonbytesStream('a'))
2674 with self.maybeRaises(TypeError):
2675 t.read(1)
2676 t = self.TextIOWrapper(NonbytesStream('a'))
2677 with self.maybeRaises(TypeError):
2678 t.readline()
2679 t = self.TextIOWrapper(NonbytesStream('a'))
2680 self.assertEqual(t.read(), u'a')
2681
Oren Milman30537692017-11-07 02:17:54 +02002682 def test_illegal_encoder(self):
2683 # bpo-31271: A TypeError should be raised in case the return value of
2684 # encoder's encode() is invalid.
2685 class BadEncoder:
2686 def encode(self, dummy):
2687 return u'spam'
2688 def get_bad_encoder(dummy):
2689 return BadEncoder()
2690 rot13 = codecs.lookup("rot13")
2691 with support.swap_attr(rot13, '_is_text_encoding', True), \
2692 support.swap_attr(rot13, 'incrementalencoder', get_bad_encoder):
2693 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
2694 with self.assertRaises(TypeError):
2695 t.write('bar')
2696 t.flush()
2697
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002698 def test_illegal_decoder(self):
2699 # Issue #17106
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002700 # Bypass the early encoding check added in issue 20404
2701 def _make_illegal_wrapper():
2702 quopri = codecs.lookup("quopri_codec")
2703 quopri._is_text_encoding = True
2704 try:
2705 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
2706 newline='\n', encoding="quopri_codec")
2707 finally:
2708 quopri._is_text_encoding = False
2709 return t
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002710 # Crash when decoder returns non-string
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002711 with support.check_py3k_warnings():
2712 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2713 encoding='quopri_codec')
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002714 with self.maybeRaises(TypeError):
2715 t.read(1)
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002716 with support.check_py3k_warnings():
2717 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2718 encoding='quopri_codec')
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002719 with self.maybeRaises(TypeError):
2720 t.readline()
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002721 with support.check_py3k_warnings():
2722 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2723 encoding='quopri_codec')
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002724 with self.maybeRaises(TypeError):
2725 t.read()
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002726 #else:
2727 #t = _make_illegal_wrapper()
2728 #self.assertRaises(TypeError, t.read, 1)
2729 #t = _make_illegal_wrapper()
2730 #self.assertRaises(TypeError, t.readline)
2731 #t = _make_illegal_wrapper()
2732 #self.assertRaises(TypeError, t.read)
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002733
Oren Milman20958e62017-08-29 19:16:12 +03002734 # Issue 31243: calling read() while the return value of decoder's
2735 # getstate() is invalid should neither crash the interpreter nor
2736 # raise a SystemError.
2737 def _make_very_illegal_wrapper(getstate_ret_val):
2738 class BadDecoder:
2739 def getstate(self):
2740 return getstate_ret_val
2741 def _get_bad_decoder(dummy):
2742 return BadDecoder()
2743 quopri = codecs.lookup("quopri_codec")
2744 with support.swap_attr(quopri, 'incrementaldecoder',
2745 _get_bad_decoder):
2746 return _make_illegal_wrapper()
2747 t = _make_very_illegal_wrapper(42)
2748 with self.maybeRaises(TypeError):
2749 t.read(42)
2750 t = _make_very_illegal_wrapper(())
2751 with self.maybeRaises(TypeError):
2752 t.read(42)
2753
Zackery Spytz0464de02018-06-29 14:07:13 -06002754 def test_issue25862(self):
2755 # Assertion failures occurred in tell() after read() and write().
2756 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
2757 t.read(1)
2758 t.read()
2759 t.tell()
2760 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
2761 t.read(1)
2762 t.write('x')
2763 t.tell()
2764
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002765
Antoine Pitrou19690592009-06-12 20:14:08 +00002766class CTextIOWrapperTest(TextIOWrapperTest):
2767
2768 def test_initialization(self):
2769 r = self.BytesIO(b"\xc3\xa9\n\n")
2770 b = self.BufferedReader(r, 1000)
2771 t = self.TextIOWrapper(b)
2772 self.assertRaises(TypeError, t.__init__, b, newline=42)
2773 self.assertRaises(ValueError, t.read)
2774 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2775 self.assertRaises(ValueError, t.read)
2776
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002777 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2778 self.assertRaises(Exception, repr, t)
2779
Antoine Pitrou19690592009-06-12 20:14:08 +00002780 def test_garbage_collection(self):
2781 # C TextIOWrapper objects are collected, and collecting them flushes
2782 # all data to disk.
2783 # The Python version has __del__, so it ends in gc.garbage instead.
2784 rawio = io.FileIO(support.TESTFN, "wb")
2785 b = self.BufferedWriter(rawio)
2786 t = self.TextIOWrapper(b, encoding="ascii")
2787 t.write("456def")
2788 t.x = t
2789 wr = weakref.ref(t)
2790 del t
2791 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03002792 self.assertIsNone(wr(), wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002793 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002794 self.assertEqual(f.read(), b"456def")
2795
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002796 def test_rwpair_cleared_before_textio(self):
2797 # Issue 13070: TextIOWrapper's finalization would crash when called
2798 # after the reference to the underlying BufferedRWPair's writer got
2799 # cleared by the GC.
2800 for i in range(1000):
2801 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2802 t1 = self.TextIOWrapper(b1, encoding="ascii")
2803 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2804 t2 = self.TextIOWrapper(b2, encoding="ascii")
2805 # circular references
2806 t1.buddy = t2
2807 t2.buddy = t1
2808 support.gc_collect()
2809
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002810 maybeRaises = unittest.TestCase.assertRaises
2811
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002812
Antoine Pitrou19690592009-06-12 20:14:08 +00002813class PyTextIOWrapperTest(TextIOWrapperTest):
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002814 @contextlib.contextmanager
2815 def maybeRaises(self, *args, **kwds):
2816 yield
Antoine Pitrou19690592009-06-12 20:14:08 +00002817
2818
2819class IncrementalNewlineDecoderTest(unittest.TestCase):
2820
2821 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002822 # UTF-8 specific tests for a newline decoder
2823 def _check_decode(b, s, **kwargs):
2824 # We exercise getstate() / setstate() as well as decode()
2825 state = decoder.getstate()
Ezio Melotti2623a372010-11-21 13:34:58 +00002826 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002827 decoder.setstate(state)
Ezio Melotti2623a372010-11-21 13:34:58 +00002828 self.assertEqual(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002829
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002830 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002831
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002832 _check_decode(b'\xe8', "")
2833 _check_decode(b'\xa2', "")
2834 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002835
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002836 _check_decode(b'\xe8', "")
2837 _check_decode(b'\xa2', "")
2838 _check_decode(b'\x88', "\u8888")
2839
2840 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002841 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2842
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002843 decoder.reset()
2844 _check_decode(b'\n', "\n")
2845 _check_decode(b'\r', "")
2846 _check_decode(b'', "\n", final=True)
2847 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002848
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002849 _check_decode(b'\r', "")
2850 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002851
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002852 _check_decode(b'\r\r\n', "\n\n")
2853 _check_decode(b'\r', "")
2854 _check_decode(b'\r', "\n")
2855 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002856
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002857 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2858 _check_decode(b'\xe8\xa2\x88', "\u8888")
2859 _check_decode(b'\n', "\n")
2860 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2861 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002862
Antoine Pitrou19690592009-06-12 20:14:08 +00002863 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002864 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002865 if encoding is not None:
2866 encoder = codecs.getincrementalencoder(encoding)()
2867 def _decode_bytewise(s):
2868 # Decode one byte at a time
2869 for b in encoder.encode(s):
2870 result.append(decoder.decode(b))
2871 else:
2872 encoder = None
2873 def _decode_bytewise(s):
2874 # Decode one char at a time
2875 for c in s:
2876 result.append(decoder.decode(c))
Ezio Melotti2623a372010-11-21 13:34:58 +00002877 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002878 _decode_bytewise("abc\n\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002879 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002880 _decode_bytewise("\nabc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002881 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002882 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002883 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002884 _decode_bytewise("abc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002885 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002886 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002887 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002888 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002889 input = "abc"
2890 if encoder is not None:
2891 encoder.reset()
2892 input = encoder.encode(input)
Ezio Melotti2623a372010-11-21 13:34:58 +00002893 self.assertEqual(decoder.decode(input), "abc")
2894 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002895
2896 def test_newline_decoder(self):
2897 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002898 # None meaning the IncrementalNewlineDecoder takes unicode input
2899 # rather than bytes input
2900 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002901 'utf-16', 'utf-16-le', 'utf-16-be',
2902 'utf-32', 'utf-32-le', 'utf-32-be',
2903 )
2904 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002905 decoder = enc and codecs.getincrementaldecoder(enc)()
2906 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2907 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002908 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002909 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2910 self.check_newline_decoding_utf8(decoder)
2911
2912 def test_newline_bytes(self):
2913 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2914 def _check(dec):
Ezio Melotti2623a372010-11-21 13:34:58 +00002915 self.assertEqual(dec.newlines, None)
2916 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2917 self.assertEqual(dec.newlines, None)
2918 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2919 self.assertEqual(dec.newlines, None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002920 dec = self.IncrementalNewlineDecoder(None, translate=False)
2921 _check(dec)
2922 dec = self.IncrementalNewlineDecoder(None, translate=True)
2923 _check(dec)
2924
2925class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2926 pass
2927
2928class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2929 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002930
Christian Heimes1a6387e2008-03-26 12:49:49 +00002931
2932# XXX Tests for open()
2933
2934class MiscIOTest(unittest.TestCase):
2935
Benjamin Petersonad100c32008-11-20 22:06:22 +00002936 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002937 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002938
Antoine Pitrou19690592009-06-12 20:14:08 +00002939 def test___all__(self):
2940 for name in self.io.__all__:
2941 obj = getattr(self.io, name, None)
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03002942 self.assertIsNotNone(obj, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002943 if name == "open":
2944 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002945 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002946 self.assertTrue(issubclass(obj, Exception), name)
2947 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002948 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002949
Benjamin Petersonad100c32008-11-20 22:06:22 +00002950 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002951 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002952 self.assertEqual(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002953 f.close()
2954
Antoine Pitrou19690592009-06-12 20:14:08 +00002955 f = self.open(support.TESTFN, "U")
Ezio Melotti2623a372010-11-21 13:34:58 +00002956 self.assertEqual(f.name, support.TESTFN)
2957 self.assertEqual(f.buffer.name, support.TESTFN)
2958 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2959 self.assertEqual(f.mode, "U")
2960 self.assertEqual(f.buffer.mode, "rb")
2961 self.assertEqual(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002962 f.close()
2963
Antoine Pitrou19690592009-06-12 20:14:08 +00002964 f = self.open(support.TESTFN, "w+")
Ezio Melotti2623a372010-11-21 13:34:58 +00002965 self.assertEqual(f.mode, "w+")
2966 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2967 self.assertEqual(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002968
Antoine Pitrou19690592009-06-12 20:14:08 +00002969 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti2623a372010-11-21 13:34:58 +00002970 self.assertEqual(g.mode, "wb")
2971 self.assertEqual(g.raw.mode, "wb")
2972 self.assertEqual(g.name, f.fileno())
2973 self.assertEqual(g.raw.name, f.fileno())
Benjamin Petersonad100c32008-11-20 22:06:22 +00002974 f.close()
2975 g.close()
2976
Antoine Pitrou19690592009-06-12 20:14:08 +00002977 def test_io_after_close(self):
2978 for kwargs in [
2979 {"mode": "w"},
2980 {"mode": "wb"},
2981 {"mode": "w", "buffering": 1},
2982 {"mode": "w", "buffering": 2},
2983 {"mode": "wb", "buffering": 0},
2984 {"mode": "r"},
2985 {"mode": "rb"},
2986 {"mode": "r", "buffering": 1},
2987 {"mode": "r", "buffering": 2},
2988 {"mode": "rb", "buffering": 0},
2989 {"mode": "w+"},
2990 {"mode": "w+b"},
2991 {"mode": "w+", "buffering": 1},
2992 {"mode": "w+", "buffering": 2},
2993 {"mode": "w+b", "buffering": 0},
2994 ]:
2995 f = self.open(support.TESTFN, **kwargs)
2996 f.close()
2997 self.assertRaises(ValueError, f.flush)
2998 self.assertRaises(ValueError, f.fileno)
2999 self.assertRaises(ValueError, f.isatty)
3000 self.assertRaises(ValueError, f.__iter__)
3001 if hasattr(f, "peek"):
3002 self.assertRaises(ValueError, f.peek, 1)
3003 self.assertRaises(ValueError, f.read)
3004 if hasattr(f, "read1"):
3005 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinner5100a402011-05-25 22:15:36 +02003006 if hasattr(f, "readall"):
3007 self.assertRaises(ValueError, f.readall)
Antoine Pitrou19690592009-06-12 20:14:08 +00003008 if hasattr(f, "readinto"):
3009 self.assertRaises(ValueError, f.readinto, bytearray(1024))
3010 self.assertRaises(ValueError, f.readline)
3011 self.assertRaises(ValueError, f.readlines)
Xiang Zhang5fbdfc32017-04-15 13:18:22 +08003012 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou19690592009-06-12 20:14:08 +00003013 self.assertRaises(ValueError, f.seek, 0)
3014 self.assertRaises(ValueError, f.tell)
3015 self.assertRaises(ValueError, f.truncate)
3016 self.assertRaises(ValueError, f.write,
3017 b"" if "b" in kwargs['mode'] else "")
3018 self.assertRaises(ValueError, f.writelines, [])
3019 self.assertRaises(ValueError, next, f)
3020
3021 def test_blockingioerror(self):
3022 # Various BlockingIOError issues
3023 self.assertRaises(TypeError, self.BlockingIOError)
3024 self.assertRaises(TypeError, self.BlockingIOError, 1)
3025 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
3026 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
3027 b = self.BlockingIOError(1, "")
3028 self.assertEqual(b.characters_written, 0)
3029 class C(unicode):
3030 pass
3031 c = C("")
3032 b = self.BlockingIOError(1, c)
3033 c.b = b
3034 b.c = c
3035 wr = weakref.ref(c)
3036 del c, b
3037 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03003038 self.assertIsNone(wr(), wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00003039
3040 def test_abcs(self):
3041 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00003042 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3043 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3044 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3045 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00003046
3047 def _check_abc_inheritance(self, abcmodule):
3048 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00003049 self.assertIsInstance(f, abcmodule.IOBase)
3050 self.assertIsInstance(f, abcmodule.RawIOBase)
3051 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3052 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00003053 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00003054 self.assertIsInstance(f, abcmodule.IOBase)
3055 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3056 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3057 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00003058 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00003059 self.assertIsInstance(f, abcmodule.IOBase)
3060 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3061 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3062 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00003063
3064 def test_abc_inheritance(self):
3065 # Test implementations inherit from their respective ABCs
3066 self._check_abc_inheritance(self)
3067
3068 def test_abc_inheritance_official(self):
3069 # Test implementations inherit from the official ABCs of the
3070 # baseline "io" module.
3071 self._check_abc_inheritance(io)
3072
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01003073 @unittest.skipUnless(fcntl, 'fcntl required for this test')
3074 def test_nonblock_pipe_write_bigbuf(self):
3075 self._test_nonblock_pipe_write(16*1024)
3076
3077 @unittest.skipUnless(fcntl, 'fcntl required for this test')
3078 def test_nonblock_pipe_write_smallbuf(self):
3079 self._test_nonblock_pipe_write(1024)
3080
3081 def _set_non_blocking(self, fd):
3082 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
3083 self.assertNotEqual(flags, -1)
3084 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
3085 self.assertEqual(res, 0)
3086
3087 def _test_nonblock_pipe_write(self, bufsize):
3088 sent = []
3089 received = []
3090 r, w = os.pipe()
3091 self._set_non_blocking(r)
3092 self._set_non_blocking(w)
3093
3094 # To exercise all code paths in the C implementation we need
3095 # to play with buffer sizes. For instance, if we choose a
3096 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3097 # then we will never get a partial write of the buffer.
3098 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3099 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3100
3101 with rf, wf:
3102 for N in 9999, 73, 7574:
3103 try:
3104 i = 0
3105 while True:
3106 msg = bytes([i % 26 + 97]) * N
3107 sent.append(msg)
3108 wf.write(msg)
3109 i += 1
3110
3111 except self.BlockingIOError as e:
3112 self.assertEqual(e.args[0], errno.EAGAIN)
3113 sent[-1] = sent[-1][:e.characters_written]
3114 received.append(rf.read())
3115 msg = b'BLOCKED'
3116 wf.write(msg)
3117 sent.append(msg)
3118
3119 while True:
3120 try:
3121 wf.flush()
3122 break
3123 except self.BlockingIOError as e:
3124 self.assertEqual(e.args[0], errno.EAGAIN)
3125 self.assertEqual(e.characters_written, 0)
3126 received.append(rf.read())
3127
3128 received += iter(rf.read, None)
3129
3130 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03003131 self.assertEqual(sent, received)
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01003132 self.assertTrue(wf.closed)
3133 self.assertTrue(rf.closed)
3134
Antoine Pitrou19690592009-06-12 20:14:08 +00003135class CMiscIOTest(MiscIOTest):
3136 io = io
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03003137 shutdown_error = "RuntimeError: could not find io module state"
Antoine Pitrou19690592009-06-12 20:14:08 +00003138
3139class PyMiscIOTest(MiscIOTest):
3140 io = pyio
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03003141 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Petersonad100c32008-11-20 22:06:22 +00003142
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003143
3144@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3145class SignalsTest(unittest.TestCase):
3146
3147 def setUp(self):
3148 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3149
3150 def tearDown(self):
3151 signal.signal(signal.SIGALRM, self.oldalrm)
3152
3153 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00003154 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003155
3156 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinner49d495f2011-07-04 11:44:46 +02003157 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
3158 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003159 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3160 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou6439c002011-02-25 21:35:47 +00003161 invokes the signal handler, and bubbles up the exception raised
3162 in the latter."""
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003163 read_results = []
3164 def _read():
3165 s = os.read(r, 1)
3166 read_results.append(s)
3167 t = threading.Thread(target=_read)
3168 t.daemon = True
3169 r, w = os.pipe()
3170 try:
3171 wio = self.io.open(w, **fdopen_kwargs)
3172 t.start()
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003173 # Fill the pipe enough that the write will be blocking.
3174 # It will be interrupted by the timer armed above. Since the
3175 # other thread has read one byte, the low-level write will
3176 # return with a successful (partial) result rather than an EINTR.
3177 # The buffered IO layer must check for pending signal
3178 # handlers, which in this case will invoke alarm_interrupt().
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03003179 try:
Victor Stinner3604b232018-06-01 15:23:02 +02003180 signal.alarm(1)
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03003181 with self.assertRaises(ZeroDivisionError):
3182 wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1))
3183 finally:
Victor Stinner3604b232018-06-01 15:23:02 +02003184 signal.alarm(0)
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03003185 t.join()
Victor Stinner3604b232018-06-01 15:23:02 +02003186
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003187 # We got one byte, get another one and check that it isn't a
3188 # repeat of the first one.
3189 read_results.append(os.read(r, 1))
3190 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3191 finally:
3192 os.close(w)
3193 os.close(r)
3194 # This is deliberate. If we didn't close the file descriptor
3195 # before closing wio, wio would try to flush its internal
3196 # buffer, and block again.
3197 try:
3198 wio.close()
3199 except IOError as e:
3200 if e.errno != errno.EBADF:
3201 raise
3202
3203 def test_interrupted_write_unbuffered(self):
3204 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3205
3206 def test_interrupted_write_buffered(self):
3207 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3208
3209 def test_interrupted_write_text(self):
3210 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3211
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003212 def check_reentrant_write(self, data, **fdopen_kwargs):
3213 def on_alarm(*args):
3214 # Will be called reentrantly from the same thread
3215 wio.write(data)
Victor Stinner4c41f842011-07-05 13:29:26 +02003216 1//0
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003217 signal.signal(signal.SIGALRM, on_alarm)
3218 r, w = os.pipe()
3219 wio = self.io.open(w, **fdopen_kwargs)
3220 try:
3221 signal.alarm(1)
3222 # Either the reentrant call to wio.write() fails with RuntimeError,
3223 # or the signal handler raises ZeroDivisionError.
3224 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3225 while 1:
3226 for i in range(100):
3227 wio.write(data)
3228 wio.flush()
3229 # Make sure the buffer doesn't fill up and block further writes
3230 os.read(r, len(data) * 100)
3231 exc = cm.exception
3232 if isinstance(exc, RuntimeError):
3233 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3234 finally:
Victor Stinner3604b232018-06-01 15:23:02 +02003235 signal.alarm(0)
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003236 wio.close()
3237 os.close(r)
3238
3239 def test_reentrant_write_buffered(self):
3240 self.check_reentrant_write(b"xy", mode="wb")
3241
3242 def test_reentrant_write_text(self):
3243 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3244
Antoine Pitrou6439c002011-02-25 21:35:47 +00003245 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3246 """Check that a buffered read, when it gets interrupted (either
3247 returning a partial result or EINTR), properly invokes the signal
3248 handler and retries if the latter returned successfully."""
3249 r, w = os.pipe()
3250 fdopen_kwargs["closefd"] = False
3251 def alarm_handler(sig, frame):
3252 os.write(w, b"bar")
3253 signal.signal(signal.SIGALRM, alarm_handler)
3254 try:
3255 rio = self.io.open(r, **fdopen_kwargs)
3256 os.write(w, b"foo")
3257 signal.alarm(1)
3258 # Expected behaviour:
3259 # - first raw read() returns partial b"foo"
3260 # - second raw read() returns EINTR
3261 # - third raw read() returns b"bar"
3262 self.assertEqual(decode(rio.read(6)), "foobar")
3263 finally:
Victor Stinner3604b232018-06-01 15:23:02 +02003264 signal.alarm(0)
Antoine Pitrou6439c002011-02-25 21:35:47 +00003265 rio.close()
3266 os.close(w)
3267 os.close(r)
3268
3269 def test_interrupterd_read_retry_buffered(self):
3270 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3271 mode="rb")
3272
3273 def test_interrupterd_read_retry_text(self):
3274 self.check_interrupted_read_retry(lambda x: x,
3275 mode="r")
3276
3277 @unittest.skipUnless(threading, 'Threading required for this test.')
3278 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3279 """Check that a buffered write, when it gets interrupted (either
3280 returning a partial result or EINTR), properly invokes the signal
3281 handler and retries if the latter returned successfully."""
3282 select = support.import_module("select")
3283 # A quantity that exceeds the buffer size of an anonymous pipe's
3284 # write end.
Antoine Pitrou68915d72013-04-24 23:31:38 +02003285 N = support.PIPE_MAX_SIZE
Antoine Pitrou6439c002011-02-25 21:35:47 +00003286 r, w = os.pipe()
3287 fdopen_kwargs["closefd"] = False
3288 # We need a separate thread to read from the pipe and allow the
3289 # write() to finish. This thread is started after the SIGALRM is
3290 # received (forcing a first EINTR in write()).
3291 read_results = []
3292 write_finished = False
Serhiy Storchaka53ea1622015-03-28 20:38:48 +02003293 error = [None]
Antoine Pitrou6439c002011-02-25 21:35:47 +00003294 def _read():
Serhiy Storchaka53ea1622015-03-28 20:38:48 +02003295 try:
3296 while not write_finished:
3297 while r in select.select([r], [], [], 1.0)[0]:
3298 s = os.read(r, 1024)
3299 read_results.append(s)
3300 except BaseException as exc:
3301 error[0] = exc
Antoine Pitrou6439c002011-02-25 21:35:47 +00003302 t = threading.Thread(target=_read)
3303 t.daemon = True
3304 def alarm1(sig, frame):
3305 signal.signal(signal.SIGALRM, alarm2)
3306 signal.alarm(1)
3307 def alarm2(sig, frame):
3308 t.start()
3309 signal.signal(signal.SIGALRM, alarm1)
3310 try:
3311 wio = self.io.open(w, **fdopen_kwargs)
3312 signal.alarm(1)
3313 # Expected behaviour:
3314 # - first raw write() is partial (because of the limited pipe buffer
3315 # and the first alarm)
3316 # - second raw write() returns EINTR (because of the second alarm)
3317 # - subsequent write()s are successful (either partial or complete)
3318 self.assertEqual(N, wio.write(item * N))
3319 wio.flush()
3320 write_finished = True
3321 t.join()
Serhiy Storchaka53ea1622015-03-28 20:38:48 +02003322
3323 self.assertIsNone(error[0])
Antoine Pitrou6439c002011-02-25 21:35:47 +00003324 self.assertEqual(N, sum(len(x) for x in read_results))
3325 finally:
Victor Stinner3604b232018-06-01 15:23:02 +02003326 signal.alarm(0)
Antoine Pitrou6439c002011-02-25 21:35:47 +00003327 write_finished = True
3328 os.close(w)
3329 os.close(r)
3330 # This is deliberate. If we didn't close the file descriptor
3331 # before closing wio, wio would try to flush its internal
3332 # buffer, and could block (in case of failure).
3333 try:
3334 wio.close()
3335 except IOError as e:
3336 if e.errno != errno.EBADF:
3337 raise
3338
3339 def test_interrupterd_write_retry_buffered(self):
3340 self.check_interrupted_write_retry(b"x", mode="wb")
3341
3342 def test_interrupterd_write_retry_text(self):
3343 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3344
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003345
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003346class CSignalsTest(SignalsTest):
3347 io = io
3348
3349class PySignalsTest(SignalsTest):
3350 io = pyio
3351
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003352 # Handling reentrancy issues would slow down _pyio even more, so the
3353 # tests are disabled.
3354 test_reentrant_write_buffered = None
3355 test_reentrant_write_text = None
3356
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003357
Christian Heimes1a6387e2008-03-26 12:49:49 +00003358def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00003359 tests = (CIOTest, PyIOTest,
3360 CBufferedReaderTest, PyBufferedReaderTest,
3361 CBufferedWriterTest, PyBufferedWriterTest,
3362 CBufferedRWPairTest, PyBufferedRWPairTest,
3363 CBufferedRandomTest, PyBufferedRandomTest,
3364 StatefulIncrementalDecoderTest,
3365 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3366 CTextIOWrapperTest, PyTextIOWrapperTest,
3367 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003368 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00003369 )
3370
3371 # Put the namespaces of the IO module we are testing and some useful mock
3372 # classes in the __dict__ of each test.
3373 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00003374 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00003375 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3376 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
3377 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
3378 globs = globals()
3379 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3380 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3381 # Avoid turning open into a bound method.
3382 py_io_ns["open"] = pyio.OpenWrapper
3383 for test in tests:
3384 if test.__name__.startswith("C"):
3385 for name, obj in c_io_ns.items():
3386 setattr(test, name, obj)
3387 elif test.__name__.startswith("Py"):
3388 for name, obj in py_io_ns.items():
3389 setattr(test, name, obj)
3390
3391 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00003392
3393if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00003394 test_main()