blob: ea5ec656f755b31b600874319bdee89ab4cba5cb [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakac72e66a2015-11-02 15:06:09 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Antoine Pitrou19690592009-06-12 20:14:08 +000019# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Serhiy Storchaka05b0a1b2014-06-09 13:32:08 +030032import warnings
Antoine Pitrou19690592009-06-12 20:14:08 +000033import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000034import signal
35import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000036from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000037from collections import deque
Antoine Pitrou78e761e2012-10-16 22:57:11 +020038from UserList import UserList
Antoine Pitrou19690592009-06-12 20:14:08 +000039from test import test_support as support
Serhiy Storchaka354d50e2013-02-03 17:10:42 +020040import contextlib
Christian Heimes1a6387e2008-03-26 12:49:49 +000041
42import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000043import io # C implementation of io
44import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000045try:
46 import threading
47except ImportError:
48 threading = None
Antoine Pitrou5aa7df32011-11-21 20:16:44 +010049try:
50 import fcntl
51except ImportError:
52 fcntl = None
Antoine Pitrou19690592009-06-12 20:14:08 +000053
54__metaclass__ = type
55bytes = support.py3k_bytes
56
Martin Panterc9813d82016-06-03 05:59:20 +000057def byteslike(*pos, **kw):
58 return memoryview(bytearray(*pos, **kw))
59
Antoine Pitrou19690592009-06-12 20:14:08 +000060def _default_chunk_size():
61 """Get the default TextIOWrapper chunk size"""
62 with io.open(__file__, "r", encoding="latin1") as f:
63 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000064
65
Antoine Pitrou6391b342010-09-14 18:48:19 +000066class MockRawIOWithoutRead:
67 """A RawIO implementation without read(), so as to exercise the default
68 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000069
70 def __init__(self, read_stack=()):
71 self._read_stack = list(read_stack)
72 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000073 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000074 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000075
Christian Heimes1a6387e2008-03-26 12:49:49 +000076 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000077 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000078 return len(b)
79
80 def writable(self):
81 return True
82
83 def fileno(self):
84 return 42
85
86 def readable(self):
87 return True
88
89 def seekable(self):
90 return True
91
92 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000093 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000094
95 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000096 return 0 # same comment as above
97
98 def readinto(self, buf):
99 self._reads += 1
100 max_len = len(buf)
101 try:
102 data = self._read_stack[0]
103 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000104 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +0000105 return 0
106 if data is None:
107 del self._read_stack[0]
108 return None
109 n = len(data)
110 if len(data) <= max_len:
111 del self._read_stack[0]
112 buf[:n] = data
113 return n
114 else:
115 buf[:] = data[:max_len]
116 self._read_stack[0] = data[max_len:]
117 return max_len
118
119 def truncate(self, pos=None):
120 return pos
121
Antoine Pitrou6391b342010-09-14 18:48:19 +0000122class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
123 pass
124
125class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
126 pass
127
128
129class MockRawIO(MockRawIOWithoutRead):
130
131 def read(self, n=None):
132 self._reads += 1
133 try:
134 return self._read_stack.pop(0)
135 except:
136 self._extraneous_reads += 1
137 return b""
138
Antoine Pitrou19690592009-06-12 20:14:08 +0000139class CMockRawIO(MockRawIO, io.RawIOBase):
140 pass
141
142class PyMockRawIO(MockRawIO, pyio.RawIOBase):
143 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000144
145
Antoine Pitrou19690592009-06-12 20:14:08 +0000146class MisbehavedRawIO(MockRawIO):
147 def write(self, b):
148 return MockRawIO.write(self, b) * 2
149
150 def read(self, n=None):
151 return MockRawIO.read(self, n) * 2
152
153 def seek(self, pos, whence):
154 return -123
155
156 def tell(self):
157 return -456
158
159 def readinto(self, buf):
160 MockRawIO.readinto(self, buf)
161 return len(buf) * 5
162
163class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
164 pass
165
166class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
167 pass
168
169
170class CloseFailureIO(MockRawIO):
171 closed = 0
172
173 def close(self):
174 if not self.closed:
175 self.closed = 1
176 raise IOError
177
178class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
179 pass
180
181class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
182 pass
183
184
185class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000186
187 def __init__(self, data):
188 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000189 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000190
191 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000192 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000193 self.read_history.append(None if res is None else len(res))
194 return res
195
Antoine Pitrou19690592009-06-12 20:14:08 +0000196 def readinto(self, b):
197 res = super(MockFileIO, self).readinto(b)
198 self.read_history.append(res)
199 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000200
Antoine Pitrou19690592009-06-12 20:14:08 +0000201class CMockFileIO(MockFileIO, io.BytesIO):
202 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000203
Antoine Pitrou19690592009-06-12 20:14:08 +0000204class PyMockFileIO(MockFileIO, pyio.BytesIO):
205 pass
206
207
208class MockNonBlockWriterIO:
209
210 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000211 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000212 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000213
Antoine Pitrou19690592009-06-12 20:14:08 +0000214 def pop_written(self):
215 s = b"".join(self._write_stack)
216 self._write_stack[:] = []
217 return s
218
219 def block_on(self, char):
220 """Block when a given char is encountered."""
221 self._blocker_char = char
222
223 def readable(self):
224 return True
225
226 def seekable(self):
227 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000228
229 def writable(self):
230 return True
231
Antoine Pitrou19690592009-06-12 20:14:08 +0000232 def write(self, b):
233 b = bytes(b)
234 n = -1
235 if self._blocker_char:
236 try:
237 n = b.index(self._blocker_char)
238 except ValueError:
239 pass
240 else:
Antoine Pitrou5aa7df32011-11-21 20:16:44 +0100241 if n > 0:
242 # write data up to the first blocker
243 self._write_stack.append(b[:n])
244 return n
245 else:
246 # cancel blocker and indicate would block
247 self._blocker_char = None
248 return None
Antoine Pitrou19690592009-06-12 20:14:08 +0000249 self._write_stack.append(b)
250 return len(b)
251
252class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
253 BlockingIOError = io.BlockingIOError
254
255class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
256 BlockingIOError = pyio.BlockingIOError
257
Christian Heimes1a6387e2008-03-26 12:49:49 +0000258
259class IOTest(unittest.TestCase):
260
Antoine Pitrou19690592009-06-12 20:14:08 +0000261 def setUp(self):
262 support.unlink(support.TESTFN)
263
Christian Heimes1a6387e2008-03-26 12:49:49 +0000264 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000265 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000266
267 def write_ops(self, f):
268 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000269 f.truncate(0)
270 self.assertEqual(f.tell(), 5)
271 f.seek(0)
272
273 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000274 self.assertEqual(f.seek(0), 0)
275 self.assertEqual(f.write(b"Hello."), 6)
276 self.assertEqual(f.tell(), 6)
277 self.assertEqual(f.seek(-1, 1), 5)
278 self.assertEqual(f.tell(), 5)
Martin Panterc9813d82016-06-03 05:59:20 +0000279 buffer = bytearray(b" world\n\n\n")
280 self.assertEqual(f.write(buffer), 9)
281 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Christian Heimes1a6387e2008-03-26 12:49:49 +0000282 self.assertEqual(f.seek(0), 0)
283 self.assertEqual(f.write(b"h"), 1)
284 self.assertEqual(f.seek(-1, 2), 13)
285 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000286
Christian Heimes1a6387e2008-03-26 12:49:49 +0000287 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000288 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000289 self.assertRaises(TypeError, f.seek, 0.0)
290
291 def read_ops(self, f, buffered=False):
292 data = f.read(5)
293 self.assertEqual(data, b"hello")
Martin Panterc9813d82016-06-03 05:59:20 +0000294 data = byteslike(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000295 self.assertEqual(f.readinto(data), 5)
Martin Panterc9813d82016-06-03 05:59:20 +0000296 self.assertEqual(data.tobytes(), b" worl")
297 data = bytearray(5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000298 self.assertEqual(f.readinto(data), 2)
299 self.assertEqual(len(data), 5)
300 self.assertEqual(data[:2], b"d\n")
301 self.assertEqual(f.seek(0), 0)
302 self.assertEqual(f.read(20), b"hello world\n")
303 self.assertEqual(f.read(1), b"")
Martin Panterc9813d82016-06-03 05:59:20 +0000304 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000305 self.assertEqual(f.seek(-6, 2), 6)
306 self.assertEqual(f.read(5), b"world")
307 self.assertEqual(f.read(0), b"")
Martin Panterc9813d82016-06-03 05:59:20 +0000308 self.assertEqual(f.readinto(byteslike()), 0)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000309 self.assertEqual(f.seek(-6, 1), 5)
310 self.assertEqual(f.read(5), b" worl")
311 self.assertEqual(f.tell(), 10)
312 self.assertRaises(TypeError, f.seek, 0.0)
313 if buffered:
314 f.seek(0)
315 self.assertEqual(f.read(), b"hello world\n")
316 f.seek(6)
317 self.assertEqual(f.read(), b"world\n")
318 self.assertEqual(f.read(), b"")
319
320 LARGE = 2**31
321
322 def large_file_ops(self, f):
323 assert f.readable()
324 assert f.writable()
325 self.assertEqual(f.seek(self.LARGE), self.LARGE)
326 self.assertEqual(f.tell(), self.LARGE)
327 self.assertEqual(f.write(b"xxx"), 3)
328 self.assertEqual(f.tell(), self.LARGE + 3)
329 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
330 self.assertEqual(f.truncate(), self.LARGE + 2)
331 self.assertEqual(f.tell(), self.LARGE + 2)
332 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
333 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000334 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000335 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
336 self.assertEqual(f.seek(-1, 2), self.LARGE)
337 self.assertEqual(f.read(2), b"x")
338
Antoine Pitrou19690592009-06-12 20:14:08 +0000339 def test_invalid_operations(self):
340 # Try writing on a file opened in read mode and vice-versa.
341 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000342 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000343 self.assertRaises(IOError, fp.read)
344 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000345 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000346 self.assertRaises(IOError, fp.write, b"blah")
347 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000348 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000349 self.assertRaises(IOError, fp.write, "blah")
350 self.assertRaises(IOError, fp.writelines, ["blah\n"])
351
Serhiy Storchaka3c9ce742016-07-01 23:34:44 +0300352 def test_open_handles_NUL_chars(self):
353 fn_with_NUL = 'foo\0bar'
354 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
355
356 bytes_fn = fn_with_NUL.encode('ascii')
357 with warnings.catch_warnings():
358 warnings.simplefilter("ignore", DeprecationWarning)
359 self.assertRaises(TypeError, self.open, bytes_fn, 'w')
360
Christian Heimes1a6387e2008-03-26 12:49:49 +0000361 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000362 with self.open(support.TESTFN, "wb", buffering=0) as f:
363 self.assertEqual(f.readable(), False)
364 self.assertEqual(f.writable(), True)
365 self.assertEqual(f.seekable(), True)
366 self.write_ops(f)
367 with self.open(support.TESTFN, "rb", buffering=0) as f:
368 self.assertEqual(f.readable(), True)
369 self.assertEqual(f.writable(), False)
370 self.assertEqual(f.seekable(), True)
371 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000372
373 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000374 with self.open(support.TESTFN, "wb") as f:
375 self.assertEqual(f.readable(), False)
376 self.assertEqual(f.writable(), True)
377 self.assertEqual(f.seekable(), True)
378 self.write_ops(f)
379 with self.open(support.TESTFN, "rb") as f:
380 self.assertEqual(f.readable(), True)
381 self.assertEqual(f.writable(), False)
382 self.assertEqual(f.seekable(), True)
383 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000384
385 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000386 with self.open(support.TESTFN, "wb") as f:
387 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
388 with self.open(support.TESTFN, "rb") as f:
389 self.assertEqual(f.readline(), b"abc\n")
390 self.assertEqual(f.readline(10), b"def\n")
391 self.assertEqual(f.readline(2), b"xy")
392 self.assertEqual(f.readline(4), b"zzy\n")
393 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000394 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000395 self.assertRaises(TypeError, f.readline, 5.3)
396 with self.open(support.TESTFN, "r") as f:
397 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000398
Serhiy Storchaka64aa4df2017-04-19 22:34:58 +0300399 def test_readline_nonsizeable(self):
400 # Issue #30061
401 # Crash when readline() returns an object without __len__
402 class R(self.IOBase):
403 def readline(self):
404 return None
405 self.assertRaises((TypeError, StopIteration), next, R())
406
407 def test_next_nonsizeable(self):
408 # Issue #30061
409 # Crash when next() returns an object without __len__
410 class R(self.IOBase):
411 def next(self):
412 return None
413 self.assertRaises(TypeError, R().readlines, 1)
414
Christian Heimes1a6387e2008-03-26 12:49:49 +0000415 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000416 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000417 self.write_ops(f)
418 data = f.getvalue()
419 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000420 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000421 self.read_ops(f, True)
422
423 def test_large_file_ops(self):
424 # On Windows and Mac OSX this test comsumes large resources; It takes
425 # a long time to build the >2GB file and takes >2GB of disk space
426 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000427 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware1f702212013-12-10 14:09:20 -0600428 support.requires(
429 'largefile',
430 'test requires %s bytes and a long time to run' % self.LARGE)
Antoine Pitrou19690592009-06-12 20:14:08 +0000431 with self.open(support.TESTFN, "w+b", 0) as f:
432 self.large_file_ops(f)
433 with self.open(support.TESTFN, "w+b") as f:
434 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000435
436 def test_with_open(self):
437 for bufsize in (0, 1, 100):
438 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000439 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000440 f.write(b"xxx")
441 self.assertEqual(f.closed, True)
442 f = None
443 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000444 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000445 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000446 except ZeroDivisionError:
447 self.assertEqual(f.closed, True)
448 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000449 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000450
Antoine Pitroue741cc62009-01-21 00:45:36 +0000451 # issue 5008
452 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000453 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000454 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000455 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000456 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000457 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000458 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000459 with self.open(support.TESTFN, "a") as f:
Serhiy Storchakaea4d2872015-08-02 15:19:04 +0300460 self.assertGreater(f.tell(), 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000461
Christian Heimes1a6387e2008-03-26 12:49:49 +0000462 def test_destructor(self):
463 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000464 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000465 def __del__(self):
466 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000467 try:
468 f = super(MyFileIO, self).__del__
469 except AttributeError:
470 pass
471 else:
472 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000473 def close(self):
474 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000475 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000476 def flush(self):
477 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000478 super(MyFileIO, self).flush()
479 f = MyFileIO(support.TESTFN, "wb")
480 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000481 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000482 support.gc_collect()
483 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000484 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000485 self.assertEqual(f.read(), b"xxx")
486
487 def _check_base_destructor(self, base):
488 record = []
489 class MyIO(base):
490 def __init__(self):
491 # This exercises the availability of attributes on object
492 # destruction.
493 # (in the C version, close() is called by the tp_dealloc
494 # function, not by __del__)
495 self.on_del = 1
496 self.on_close = 2
497 self.on_flush = 3
498 def __del__(self):
499 record.append(self.on_del)
500 try:
501 f = super(MyIO, self).__del__
502 except AttributeError:
503 pass
504 else:
505 f()
506 def close(self):
507 record.append(self.on_close)
508 super(MyIO, self).close()
509 def flush(self):
510 record.append(self.on_flush)
511 super(MyIO, self).flush()
512 f = MyIO()
513 del f
514 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000515 self.assertEqual(record, [1, 2, 3])
516
Antoine Pitrou19690592009-06-12 20:14:08 +0000517 def test_IOBase_destructor(self):
518 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000519
Antoine Pitrou19690592009-06-12 20:14:08 +0000520 def test_RawIOBase_destructor(self):
521 self._check_base_destructor(self.RawIOBase)
522
523 def test_BufferedIOBase_destructor(self):
524 self._check_base_destructor(self.BufferedIOBase)
525
526 def test_TextIOBase_destructor(self):
527 self._check_base_destructor(self.TextIOBase)
528
529 def test_close_flushes(self):
530 with self.open(support.TESTFN, "wb") as f:
531 f.write(b"xxx")
532 with self.open(support.TESTFN, "rb") as f:
533 self.assertEqual(f.read(), b"xxx")
534
535 def test_array_writes(self):
536 a = array.array(b'i', range(10))
537 n = len(a.tostring())
538 with self.open(support.TESTFN, "wb", 0) as f:
539 self.assertEqual(f.write(a), n)
540 with self.open(support.TESTFN, "wb") as f:
541 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000542
543 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000544 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000545 closefd=False)
546
Antoine Pitrou19690592009-06-12 20:14:08 +0000547 def test_read_closed(self):
548 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000549 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000550 with self.open(support.TESTFN, "r") as f:
551 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000552 self.assertEqual(file.read(), "egg\n")
553 file.seek(0)
554 file.close()
555 self.assertRaises(ValueError, file.read)
556
557 def test_no_closefd_with_filename(self):
558 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000559 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000560
561 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000562 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000563 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000564 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000565 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000566 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000567 self.assertEqual(file.buffer.raw.closefd, False)
568
Antoine Pitrou19690592009-06-12 20:14:08 +0000569 def test_garbage_collection(self):
570 # FileIO objects are collected, and collecting them flushes
571 # all data to disk.
572 f = self.FileIO(support.TESTFN, "wb")
573 f.write(b"abcxxx")
574 f.f = f
575 wr = weakref.ref(f)
576 del f
577 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +0300578 self.assertIsNone(wr(), wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000579 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000580 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000581
Antoine Pitrou19690592009-06-12 20:14:08 +0000582 def test_unbounded_file(self):
583 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
584 zero = "/dev/zero"
585 if not os.path.exists(zero):
586 self.skipTest("{0} does not exist".format(zero))
587 if sys.maxsize > 0x7FFFFFFF:
588 self.skipTest("test can only run in a 32-bit address space")
589 if support.real_max_memuse < support._2G:
590 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000591 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000592 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000593 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000594 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000595 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000596 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000597
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200598 def check_flush_error_on_close(self, *args, **kwargs):
599 # Test that the file is closed despite failed flush
600 # and that flush() is called before file closed.
601 f = self.open(*args, **kwargs)
602 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000603 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200604 closed[:] = [f.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000605 raise IOError()
606 f.flush = bad_flush
607 self.assertRaises(IOError, f.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600608 self.assertTrue(f.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200609 self.assertTrue(closed) # flush() called
610 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchaka437d5352015-02-23 00:28:38 +0200611 f.flush = lambda: None # break reference loop
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200612
613 def test_flush_error_on_close(self):
614 # raw file
615 # Issue #5700: io.FileIO calls flush() after file closed
616 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
617 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
618 self.check_flush_error_on_close(fd, 'wb', buffering=0)
619 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
620 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
621 os.close(fd)
622 # buffered io
623 self.check_flush_error_on_close(support.TESTFN, 'wb')
624 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
625 self.check_flush_error_on_close(fd, 'wb')
626 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
627 self.check_flush_error_on_close(fd, 'wb', closefd=False)
628 os.close(fd)
629 # text io
630 self.check_flush_error_on_close(support.TESTFN, 'w')
631 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
632 self.check_flush_error_on_close(fd, 'w')
633 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
634 self.check_flush_error_on_close(fd, 'w', closefd=False)
635 os.close(fd)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000636
637 def test_multi_close(self):
638 f = self.open(support.TESTFN, "wb", buffering=0)
639 f.close()
640 f.close()
641 f.close()
642 self.assertRaises(ValueError, f.flush)
643
Antoine Pitrou6391b342010-09-14 18:48:19 +0000644 def test_RawIOBase_read(self):
645 # Exercise the default RawIOBase.read() implementation (which calls
646 # readinto() internally).
647 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
648 self.assertEqual(rawio.read(2), b"ab")
649 self.assertEqual(rawio.read(2), b"c")
650 self.assertEqual(rawio.read(2), b"d")
651 self.assertEqual(rawio.read(2), None)
652 self.assertEqual(rawio.read(2), b"ef")
653 self.assertEqual(rawio.read(2), b"g")
654 self.assertEqual(rawio.read(2), None)
655 self.assertEqual(rawio.read(2), b"")
656
Hynek Schlawack877effc2012-05-25 09:24:18 +0200657 def test_fileio_closefd(self):
658 # Issue #4841
659 with self.open(__file__, 'rb') as f1, \
660 self.open(__file__, 'rb') as f2:
661 fileio = self.FileIO(f1.fileno(), closefd=False)
662 # .__init__() must not close f1
663 fileio.__init__(f2.fileno(), closefd=False)
664 f1.readline()
665 # .close() must not close f2
666 fileio.close()
667 f2.readline()
668
Serhiy Storchaka05b0a1b2014-06-09 13:32:08 +0300669 def test_nonbuffered_textio(self):
670 with warnings.catch_warnings(record=True) as recorded:
671 with self.assertRaises(ValueError):
672 self.open(support.TESTFN, 'w', buffering=0)
673 support.gc_collect()
674 self.assertEqual(recorded, [])
675
676 def test_invalid_newline(self):
677 with warnings.catch_warnings(record=True) as recorded:
678 with self.assertRaises(ValueError):
679 self.open(support.TESTFN, 'w', newline='invalid')
680 support.gc_collect()
681 self.assertEqual(recorded, [])
682
Martin Panterc9813d82016-06-03 05:59:20 +0000683 def test_buffered_readinto_mixin(self):
684 # Test the implementation provided by BufferedIOBase
685 class Stream(self.BufferedIOBase):
686 def read(self, size):
687 return b"12345"
688 stream = Stream()
689 buffer = byteslike(5)
690 self.assertEqual(stream.readinto(buffer), 5)
691 self.assertEqual(buffer.tobytes(), b"12345")
692
Hynek Schlawack877effc2012-05-25 09:24:18 +0200693
Antoine Pitrou19690592009-06-12 20:14:08 +0000694class CIOTest(IOTest):
Antoine Pitrou16166452011-07-12 22:04:20 +0200695
696 def test_IOBase_finalize(self):
697 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
698 # class which inherits IOBase and an object of this class are caught
699 # in a reference cycle and close() is already in the method cache.
700 class MyIO(self.IOBase):
701 def close(self):
702 pass
703
704 # create an instance to populate the method cache
705 MyIO()
706 obj = MyIO()
707 obj.obj = obj
708 wr = weakref.ref(obj)
709 del MyIO
710 del obj
711 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +0300712 self.assertIsNone(wr(), wr)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000713
Antoine Pitrou19690592009-06-12 20:14:08 +0000714class PyIOTest(IOTest):
715 test_array_writes = unittest.skip(
716 "len(array.array) returns number of elements rather than bytelength"
717 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000718
719
Antoine Pitrou19690592009-06-12 20:14:08 +0000720class CommonBufferedTests:
721 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
722
723 def test_detach(self):
724 raw = self.MockRawIO()
725 buf = self.tp(raw)
726 self.assertIs(buf.detach(), raw)
727 self.assertRaises(ValueError, buf.detach)
728
Benjamin Peterson53ae6142014-12-21 20:51:50 -0600729 repr(buf) # Should still work
730
Antoine Pitrou19690592009-06-12 20:14:08 +0000731 def test_fileno(self):
732 rawio = self.MockRawIO()
733 bufio = self.tp(rawio)
734
Ezio Melotti2623a372010-11-21 13:34:58 +0000735 self.assertEqual(42, bufio.fileno())
Antoine Pitrou19690592009-06-12 20:14:08 +0000736
Antoine Pitrou19690592009-06-12 20:14:08 +0000737 def test_invalid_args(self):
738 rawio = self.MockRawIO()
739 bufio = self.tp(rawio)
740 # Invalid whence
741 self.assertRaises(ValueError, bufio.seek, 0, -1)
742 self.assertRaises(ValueError, bufio.seek, 0, 3)
743
744 def test_override_destructor(self):
745 tp = self.tp
746 record = []
747 class MyBufferedIO(tp):
748 def __del__(self):
749 record.append(1)
750 try:
751 f = super(MyBufferedIO, self).__del__
752 except AttributeError:
753 pass
754 else:
755 f()
756 def close(self):
757 record.append(2)
758 super(MyBufferedIO, self).close()
759 def flush(self):
760 record.append(3)
761 super(MyBufferedIO, self).flush()
762 rawio = self.MockRawIO()
763 bufio = MyBufferedIO(rawio)
764 writable = bufio.writable()
765 del bufio
766 support.gc_collect()
767 if writable:
768 self.assertEqual(record, [1, 2, 3])
769 else:
770 self.assertEqual(record, [1, 2])
771
772 def test_context_manager(self):
773 # Test usability as a context manager
774 rawio = self.MockRawIO()
775 bufio = self.tp(rawio)
776 def _with():
777 with bufio:
778 pass
779 _with()
780 # bufio should now be closed, and using it a second time should raise
781 # a ValueError.
782 self.assertRaises(ValueError, _with)
783
784 def test_error_through_destructor(self):
785 # Test that the exception state is not modified by a destructor,
786 # even if close() fails.
787 rawio = self.CloseFailureIO()
788 def f():
789 self.tp(rawio).xyzzy
790 with support.captured_output("stderr") as s:
791 self.assertRaises(AttributeError, f)
792 s = s.getvalue().strip()
793 if s:
794 # The destructor *may* have printed an unraisable error, check it
795 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000796 self.assertTrue(s.startswith("Exception IOError: "), s)
797 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000798
799 def test_repr(self):
800 raw = self.MockRawIO()
801 b = self.tp(raw)
802 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
803 self.assertEqual(repr(b), "<%s>" % clsname)
804 raw.name = "dummy"
805 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
806 raw.name = b"dummy"
807 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000808
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000809 def test_flush_error_on_close(self):
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200810 # Test that buffered file is closed despite failed flush
811 # and that flush() is called before file closed.
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000812 raw = self.MockRawIO()
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200813 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000814 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200815 closed[:] = [b.closed, raw.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000816 raise IOError()
817 raw.flush = bad_flush
818 b = self.tp(raw)
819 self.assertRaises(IOError, b.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600820 self.assertTrue(b.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200821 self.assertTrue(raw.closed)
822 self.assertTrue(closed) # flush() called
823 self.assertFalse(closed[0]) # flush() called before file closed
824 self.assertFalse(closed[1])
Serhiy Storchaka437d5352015-02-23 00:28:38 +0200825 raw.flush = lambda: None # break reference loop
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600826
827 def test_close_error_on_close(self):
828 raw = self.MockRawIO()
829 def bad_flush():
830 raise IOError('flush')
831 def bad_close():
832 raise IOError('close')
833 raw.close = bad_close
834 b = self.tp(raw)
835 b.flush = bad_flush
836 with self.assertRaises(IOError) as err: # exception not swallowed
837 b.close()
838 self.assertEqual(err.exception.args, ('close',))
839 self.assertFalse(b.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000840
841 def test_multi_close(self):
842 raw = self.MockRawIO()
843 b = self.tp(raw)
844 b.close()
845 b.close()
846 b.close()
847 self.assertRaises(ValueError, b.flush)
848
Antoine Pitroufc9ead62010-12-21 21:26:55 +0000849 def test_readonly_attributes(self):
850 raw = self.MockRawIO()
851 buf = self.tp(raw)
852 x = self.MockRawIO()
853 with self.assertRaises((AttributeError, TypeError)):
854 buf.raw = x
855
Christian Heimes1a6387e2008-03-26 12:49:49 +0000856
Antoine Pitroubff5df02012-07-29 19:02:46 +0200857class SizeofTest:
858
859 @support.cpython_only
860 def test_sizeof(self):
861 bufsize1 = 4096
862 bufsize2 = 8192
863 rawio = self.MockRawIO()
864 bufio = self.tp(rawio, buffer_size=bufsize1)
865 size = sys.getsizeof(bufio) - bufsize1
866 rawio = self.MockRawIO()
867 bufio = self.tp(rawio, buffer_size=bufsize2)
868 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
869
870
Antoine Pitrou19690592009-06-12 20:14:08 +0000871class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
872 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000873
Antoine Pitrou19690592009-06-12 20:14:08 +0000874 def test_constructor(self):
875 rawio = self.MockRawIO([b"abc"])
876 bufio = self.tp(rawio)
877 bufio.__init__(rawio)
878 bufio.__init__(rawio, buffer_size=1024)
879 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000880 self.assertEqual(b"abc", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000881 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
882 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
883 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
884 rawio = self.MockRawIO([b"abc"])
885 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000886 self.assertEqual(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000887
Serhiy Storchaka1d19f972014-02-12 10:52:07 +0200888 def test_uninitialized(self):
889 bufio = self.tp.__new__(self.tp)
890 del bufio
891 bufio = self.tp.__new__(self.tp)
892 self.assertRaisesRegexp((ValueError, AttributeError),
893 'uninitialized|has no attribute',
894 bufio.read, 0)
895 bufio.__init__(self.MockRawIO())
896 self.assertEqual(bufio.read(0), b'')
897
Antoine Pitrou19690592009-06-12 20:14:08 +0000898 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000899 for arg in (None, 7):
900 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
901 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000902 self.assertEqual(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000903 # Invalid args
904 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000905
Antoine Pitrou19690592009-06-12 20:14:08 +0000906 def test_read1(self):
907 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
908 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000909 self.assertEqual(b"a", bufio.read(1))
910 self.assertEqual(b"b", bufio.read1(1))
911 self.assertEqual(rawio._reads, 1)
912 self.assertEqual(b"c", bufio.read1(100))
913 self.assertEqual(rawio._reads, 1)
914 self.assertEqual(b"d", bufio.read1(100))
915 self.assertEqual(rawio._reads, 2)
916 self.assertEqual(b"efg", bufio.read1(100))
917 self.assertEqual(rawio._reads, 3)
918 self.assertEqual(b"", bufio.read1(100))
919 self.assertEqual(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000920 # Invalid args
921 self.assertRaises(ValueError, bufio.read1, -1)
922
923 def test_readinto(self):
924 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
925 bufio = self.tp(rawio)
926 b = bytearray(2)
Ezio Melotti2623a372010-11-21 13:34:58 +0000927 self.assertEqual(bufio.readinto(b), 2)
928 self.assertEqual(b, b"ab")
929 self.assertEqual(bufio.readinto(b), 2)
930 self.assertEqual(b, b"cd")
931 self.assertEqual(bufio.readinto(b), 2)
932 self.assertEqual(b, b"ef")
933 self.assertEqual(bufio.readinto(b), 1)
934 self.assertEqual(b, b"gf")
935 self.assertEqual(bufio.readinto(b), 0)
936 self.assertEqual(b, b"gf")
Antoine Pitrou19690592009-06-12 20:14:08 +0000937
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000938 def test_readlines(self):
939 def bufio():
940 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
941 return self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000942 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
943 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
944 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000945
Antoine Pitrou19690592009-06-12 20:14:08 +0000946 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000947 data = b"abcdefghi"
948 dlen = len(data)
949
950 tests = [
951 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
952 [ 100, [ 3, 3, 3], [ dlen ] ],
953 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
954 ]
955
956 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000957 rawio = self.MockFileIO(data)
958 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000959 pos = 0
960 for nbytes in buf_read_sizes:
Ezio Melotti2623a372010-11-21 13:34:58 +0000961 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Christian Heimes1a6387e2008-03-26 12:49:49 +0000962 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000963 # this is mildly implementation-dependent
Ezio Melotti2623a372010-11-21 13:34:58 +0000964 self.assertEqual(rawio.read_history, raw_read_sizes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000965
Antoine Pitrou19690592009-06-12 20:14:08 +0000966 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000967 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000968 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
969 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000970 self.assertEqual(b"abcd", bufio.read(6))
971 self.assertEqual(b"e", bufio.read(1))
972 self.assertEqual(b"fg", bufio.read())
973 self.assertEqual(b"", bufio.peek(1))
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200974 self.assertIsNone(bufio.read())
Ezio Melotti2623a372010-11-21 13:34:58 +0000975 self.assertEqual(b"", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000976
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200977 rawio = self.MockRawIO((b"a", None, None))
978 self.assertEqual(b"a", rawio.readall())
979 self.assertIsNone(rawio.readall())
980
Antoine Pitrou19690592009-06-12 20:14:08 +0000981 def test_read_past_eof(self):
982 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
983 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000984
Ezio Melotti2623a372010-11-21 13:34:58 +0000985 self.assertEqual(b"abcdefg", bufio.read(9000))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000986
Antoine Pitrou19690592009-06-12 20:14:08 +0000987 def test_read_all(self):
988 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
989 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000990
Ezio Melotti2623a372010-11-21 13:34:58 +0000991 self.assertEqual(b"abcdefg", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000992
Victor Stinner6a102812010-04-27 23:55:59 +0000993 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +0000994 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +0000995 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000996 try:
997 # Write out many bytes with exactly the same number of 0's,
998 # 1's... 255's. This will help us check that concurrent reading
999 # doesn't duplicate or forget contents.
1000 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +00001001 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001002 random.shuffle(l)
1003 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001004 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001005 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001006 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001007 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001008 errors = []
1009 results = []
1010 def f():
1011 try:
1012 # Intra-buffer read then buffer-flushing read
1013 for n in cycle([1, 19]):
1014 s = bufio.read(n)
1015 if not s:
1016 break
1017 # list.append() is atomic
1018 results.append(s)
1019 except Exception as e:
1020 errors.append(e)
1021 raise
1022 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03001023 with support.start_threads(threads):
1024 time.sleep(0.02) # yield
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001025 self.assertFalse(errors,
1026 "the following exceptions were caught: %r" % errors)
1027 s = b''.join(results)
1028 for i in range(256):
1029 c = bytes(bytearray([i]))
1030 self.assertEqual(s.count(c), N)
1031 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001032 support.unlink(support.TESTFN)
1033
1034 def test_misbehaved_io(self):
1035 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1036 bufio = self.tp(rawio)
1037 self.assertRaises(IOError, bufio.seek, 0)
1038 self.assertRaises(IOError, bufio.tell)
1039
Antoine Pitroucb4f47c2010-08-11 13:40:17 +00001040 def test_no_extraneous_read(self):
1041 # Issue #9550; when the raw IO object has satisfied the read request,
1042 # we should not issue any additional reads, otherwise it may block
1043 # (e.g. socket).
1044 bufsize = 16
1045 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1046 rawio = self.MockRawIO([b"x" * n])
1047 bufio = self.tp(rawio, bufsize)
1048 self.assertEqual(bufio.read(n), b"x" * n)
1049 # Simple case: one raw read is enough to satisfy the request.
1050 self.assertEqual(rawio._extraneous_reads, 0,
1051 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1052 # A more complex case where two raw reads are needed to satisfy
1053 # the request.
1054 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1055 bufio = self.tp(rawio, bufsize)
1056 self.assertEqual(bufio.read(n), b"x" * n)
1057 self.assertEqual(rawio._extraneous_reads, 0,
1058 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1059
1060
Antoine Pitroubff5df02012-07-29 19:02:46 +02001061class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001062 tp = io.BufferedReader
1063
1064 def test_constructor(self):
1065 BufferedReaderTest.test_constructor(self)
1066 # The allocation can succeed on 32-bit builds, e.g. with more
1067 # than 2GB RAM and a 64-bit kernel.
1068 if sys.maxsize > 0x7FFFFFFF:
1069 rawio = self.MockRawIO()
1070 bufio = self.tp(rawio)
1071 self.assertRaises((OverflowError, MemoryError, ValueError),
1072 bufio.__init__, rawio, sys.maxsize)
1073
1074 def test_initialization(self):
1075 rawio = self.MockRawIO([b"abc"])
1076 bufio = self.tp(rawio)
1077 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1078 self.assertRaises(ValueError, bufio.read)
1079 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1080 self.assertRaises(ValueError, bufio.read)
1081 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1082 self.assertRaises(ValueError, bufio.read)
1083
1084 def test_misbehaved_io_read(self):
1085 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1086 bufio = self.tp(rawio)
1087 # _pyio.BufferedReader seems to implement reading different, so that
1088 # checking this is not so easy.
1089 self.assertRaises(IOError, bufio.read, 10)
1090
1091 def test_garbage_collection(self):
1092 # C BufferedReader objects are collected.
1093 # The Python version has __del__, so it ends into gc.garbage instead
1094 rawio = self.FileIO(support.TESTFN, "w+b")
1095 f = self.tp(rawio)
1096 f.f = f
1097 wr = weakref.ref(f)
1098 del f
1099 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03001100 self.assertIsNone(wr(), wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00001101
R David Murray5b2cf5e2013-02-23 22:11:21 -05001102 def test_args_error(self):
1103 # Issue #17275
1104 with self.assertRaisesRegexp(TypeError, "BufferedReader"):
1105 self.tp(io.BytesIO(), 1024, 1024, 1024)
1106
1107
Antoine Pitrou19690592009-06-12 20:14:08 +00001108class PyBufferedReaderTest(BufferedReaderTest):
1109 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001110
1111
Antoine Pitrou19690592009-06-12 20:14:08 +00001112class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1113 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001114
Antoine Pitrou19690592009-06-12 20:14:08 +00001115 def test_constructor(self):
1116 rawio = self.MockRawIO()
1117 bufio = self.tp(rawio)
1118 bufio.__init__(rawio)
1119 bufio.__init__(rawio, buffer_size=1024)
1120 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +00001121 self.assertEqual(3, bufio.write(b"abc"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001122 bufio.flush()
1123 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1124 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1125 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1126 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +00001127 self.assertEqual(3, bufio.write(b"ghi"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001128 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001129 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001130
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001131 def test_uninitialized(self):
1132 bufio = self.tp.__new__(self.tp)
1133 del bufio
1134 bufio = self.tp.__new__(self.tp)
1135 self.assertRaisesRegexp((ValueError, AttributeError),
1136 'uninitialized|has no attribute',
1137 bufio.write, b'')
1138 bufio.__init__(self.MockRawIO())
1139 self.assertEqual(bufio.write(b''), 0)
1140
Antoine Pitrou19690592009-06-12 20:14:08 +00001141 def test_detach_flush(self):
1142 raw = self.MockRawIO()
1143 buf = self.tp(raw)
1144 buf.write(b"howdy!")
1145 self.assertFalse(raw._write_stack)
1146 buf.detach()
1147 self.assertEqual(raw._write_stack, [b"howdy!"])
1148
1149 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001150 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +00001151 writer = self.MockRawIO()
1152 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001153 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001154 self.assertFalse(writer._write_stack)
Martin Panterc9813d82016-06-03 05:59:20 +00001155 buffer = bytearray(b"def")
1156 bufio.write(buffer)
1157 buffer[:] = b"***" # Overwrite our copy of the data
1158 bufio.flush()
1159 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001160
Antoine Pitrou19690592009-06-12 20:14:08 +00001161 def test_write_overflow(self):
1162 writer = self.MockRawIO()
1163 bufio = self.tp(writer, 8)
1164 contents = b"abcdefghijklmnop"
1165 for n in range(0, len(contents), 3):
1166 bufio.write(contents[n:n+3])
1167 flushed = b"".join(writer._write_stack)
1168 # At least (total - 8) bytes were implicitly flushed, perhaps more
1169 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001170 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001171
Antoine Pitrou19690592009-06-12 20:14:08 +00001172 def check_writes(self, intermediate_func):
1173 # Lots of writes, test the flushed output is as expected.
1174 contents = bytes(range(256)) * 1000
1175 n = 0
1176 writer = self.MockRawIO()
1177 bufio = self.tp(writer, 13)
1178 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1179 def gen_sizes():
1180 for size in count(1):
1181 for i in range(15):
1182 yield size
1183 sizes = gen_sizes()
1184 while n < len(contents):
1185 size = min(next(sizes), len(contents) - n)
Ezio Melotti2623a372010-11-21 13:34:58 +00001186 self.assertEqual(bufio.write(contents[n:n+size]), size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001187 intermediate_func(bufio)
1188 n += size
1189 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001190 self.assertEqual(contents,
Antoine Pitrou19690592009-06-12 20:14:08 +00001191 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001192
Antoine Pitrou19690592009-06-12 20:14:08 +00001193 def test_writes(self):
1194 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001195
Antoine Pitrou19690592009-06-12 20:14:08 +00001196 def test_writes_and_flushes(self):
1197 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001198
Antoine Pitrou19690592009-06-12 20:14:08 +00001199 def test_writes_and_seeks(self):
1200 def _seekabs(bufio):
1201 pos = bufio.tell()
1202 bufio.seek(pos + 1, 0)
1203 bufio.seek(pos - 1, 0)
1204 bufio.seek(pos, 0)
1205 self.check_writes(_seekabs)
1206 def _seekrel(bufio):
1207 pos = bufio.seek(0, 1)
1208 bufio.seek(+1, 1)
1209 bufio.seek(-1, 1)
1210 bufio.seek(pos, 0)
1211 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001212
Antoine Pitrou19690592009-06-12 20:14:08 +00001213 def test_writes_and_truncates(self):
1214 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001215
Antoine Pitrou19690592009-06-12 20:14:08 +00001216 def test_write_non_blocking(self):
1217 raw = self.MockNonBlockWriterIO()
1218 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001219
Ezio Melotti2623a372010-11-21 13:34:58 +00001220 self.assertEqual(bufio.write(b"abcd"), 4)
1221 self.assertEqual(bufio.write(b"efghi"), 5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001222 # 1 byte will be written, the rest will be buffered
1223 raw.block_on(b"k")
Ezio Melotti2623a372010-11-21 13:34:58 +00001224 self.assertEqual(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001225
Antoine Pitrou19690592009-06-12 20:14:08 +00001226 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1227 raw.block_on(b"0")
1228 try:
1229 bufio.write(b"opqrwxyz0123456789")
1230 except self.BlockingIOError as e:
1231 written = e.characters_written
1232 else:
1233 self.fail("BlockingIOError should have been raised")
Ezio Melotti2623a372010-11-21 13:34:58 +00001234 self.assertEqual(written, 16)
1235 self.assertEqual(raw.pop_written(),
Antoine Pitrou19690592009-06-12 20:14:08 +00001236 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001237
Ezio Melotti2623a372010-11-21 13:34:58 +00001238 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Antoine Pitrou19690592009-06-12 20:14:08 +00001239 s = raw.pop_written()
1240 # Previously buffered bytes were flushed
1241 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001242
Antoine Pitrou19690592009-06-12 20:14:08 +00001243 def test_write_and_rewind(self):
1244 raw = io.BytesIO()
1245 bufio = self.tp(raw, 4)
1246 self.assertEqual(bufio.write(b"abcdef"), 6)
1247 self.assertEqual(bufio.tell(), 6)
1248 bufio.seek(0, 0)
1249 self.assertEqual(bufio.write(b"XY"), 2)
1250 bufio.seek(6, 0)
1251 self.assertEqual(raw.getvalue(), b"XYcdef")
1252 self.assertEqual(bufio.write(b"123456"), 6)
1253 bufio.flush()
1254 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001255
Antoine Pitrou19690592009-06-12 20:14:08 +00001256 def test_flush(self):
1257 writer = self.MockRawIO()
1258 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001259 bufio.write(b"abc")
1260 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001261 self.assertEqual(b"abc", writer._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001262
Antoine Pitrou78e761e2012-10-16 22:57:11 +02001263 def test_writelines(self):
1264 l = [b'ab', b'cd', b'ef']
1265 writer = self.MockRawIO()
1266 bufio = self.tp(writer, 8)
1267 bufio.writelines(l)
1268 bufio.flush()
1269 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1270
1271 def test_writelines_userlist(self):
1272 l = UserList([b'ab', b'cd', b'ef'])
1273 writer = self.MockRawIO()
1274 bufio = self.tp(writer, 8)
1275 bufio.writelines(l)
1276 bufio.flush()
1277 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1278
1279 def test_writelines_error(self):
1280 writer = self.MockRawIO()
1281 bufio = self.tp(writer, 8)
1282 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1283 self.assertRaises(TypeError, bufio.writelines, None)
1284
Antoine Pitrou19690592009-06-12 20:14:08 +00001285 def test_destructor(self):
1286 writer = self.MockRawIO()
1287 bufio = self.tp(writer, 8)
1288 bufio.write(b"abc")
1289 del bufio
1290 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001291 self.assertEqual(b"abc", writer._write_stack[0])
Antoine Pitrou19690592009-06-12 20:14:08 +00001292
1293 def test_truncate(self):
1294 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001295 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001296 bufio = self.tp(raw, 8)
1297 bufio.write(b"abcdef")
1298 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001299 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001300 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001301 self.assertEqual(f.read(), b"abc")
1302
Victor Stinner6a102812010-04-27 23:55:59 +00001303 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001304 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001305 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001306 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001307 # Write out many bytes from many threads and test they were
1308 # all flushed.
1309 N = 1000
1310 contents = bytes(range(256)) * N
1311 sizes = cycle([1, 19])
1312 n = 0
1313 queue = deque()
1314 while n < len(contents):
1315 size = next(sizes)
1316 queue.append(contents[n:n+size])
1317 n += size
1318 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001319 # We use a real file object because it allows us to
1320 # exercise situations where the GIL is released before
1321 # writing the buffer to the raw streams. This is in addition
1322 # to concurrency issues due to switching threads in the middle
1323 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001324 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001325 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001326 errors = []
1327 def f():
1328 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001329 while True:
1330 try:
1331 s = queue.popleft()
1332 except IndexError:
1333 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001334 bufio.write(s)
1335 except Exception as e:
1336 errors.append(e)
1337 raise
1338 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03001339 with support.start_threads(threads):
1340 time.sleep(0.02) # yield
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001341 self.assertFalse(errors,
1342 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001343 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001344 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001345 s = f.read()
1346 for i in range(256):
Ezio Melotti2623a372010-11-21 13:34:58 +00001347 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001348 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001349 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001350
Antoine Pitrou19690592009-06-12 20:14:08 +00001351 def test_misbehaved_io(self):
1352 rawio = self.MisbehavedRawIO()
1353 bufio = self.tp(rawio, 5)
1354 self.assertRaises(IOError, bufio.seek, 0)
1355 self.assertRaises(IOError, bufio.tell)
1356 self.assertRaises(IOError, bufio.write, b"abcdef")
1357
1358 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001359 with support.check_warnings(("max_buffer_size is deprecated",
1360 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001361 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001362
Benjamin Petersona2d6d712012-12-20 12:24:10 -06001363 def test_write_error_on_close(self):
1364 raw = self.MockRawIO()
1365 def bad_write(b):
1366 raise IOError()
1367 raw.write = bad_write
1368 b = self.tp(raw)
1369 b.write(b'spam')
1370 self.assertRaises(IOError, b.close) # exception not swallowed
1371 self.assertTrue(b.closed)
1372
Antoine Pitrou19690592009-06-12 20:14:08 +00001373
Antoine Pitroubff5df02012-07-29 19:02:46 +02001374class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001375 tp = io.BufferedWriter
1376
1377 def test_constructor(self):
1378 BufferedWriterTest.test_constructor(self)
1379 # The allocation can succeed on 32-bit builds, e.g. with more
1380 # than 2GB RAM and a 64-bit kernel.
1381 if sys.maxsize > 0x7FFFFFFF:
1382 rawio = self.MockRawIO()
1383 bufio = self.tp(rawio)
1384 self.assertRaises((OverflowError, MemoryError, ValueError),
1385 bufio.__init__, rawio, sys.maxsize)
1386
1387 def test_initialization(self):
1388 rawio = self.MockRawIO()
1389 bufio = self.tp(rawio)
1390 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1391 self.assertRaises(ValueError, bufio.write, b"def")
1392 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1393 self.assertRaises(ValueError, bufio.write, b"def")
1394 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1395 self.assertRaises(ValueError, bufio.write, b"def")
1396
1397 def test_garbage_collection(self):
1398 # C BufferedWriter objects are collected, and collecting them flushes
1399 # all data to disk.
1400 # The Python version has __del__, so it ends into gc.garbage instead
1401 rawio = self.FileIO(support.TESTFN, "w+b")
1402 f = self.tp(rawio)
1403 f.write(b"123xxx")
1404 f.x = f
1405 wr = weakref.ref(f)
1406 del f
1407 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03001408 self.assertIsNone(wr(), wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001409 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001410 self.assertEqual(f.read(), b"123xxx")
1411
R David Murray5b2cf5e2013-02-23 22:11:21 -05001412 def test_args_error(self):
1413 # Issue #17275
1414 with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
1415 self.tp(io.BytesIO(), 1024, 1024, 1024)
1416
Antoine Pitrou19690592009-06-12 20:14:08 +00001417
1418class PyBufferedWriterTest(BufferedWriterTest):
1419 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001420
1421class BufferedRWPairTest(unittest.TestCase):
1422
Antoine Pitrou19690592009-06-12 20:14:08 +00001423 def test_constructor(self):
1424 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001425 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001426
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001427 def test_uninitialized(self):
1428 pair = self.tp.__new__(self.tp)
1429 del pair
1430 pair = self.tp.__new__(self.tp)
1431 self.assertRaisesRegexp((ValueError, AttributeError),
1432 'uninitialized|has no attribute',
1433 pair.read, 0)
1434 self.assertRaisesRegexp((ValueError, AttributeError),
1435 'uninitialized|has no attribute',
1436 pair.write, b'')
1437 pair.__init__(self.MockRawIO(), self.MockRawIO())
1438 self.assertEqual(pair.read(0), b'')
1439 self.assertEqual(pair.write(b''), 0)
1440
Antoine Pitrou19690592009-06-12 20:14:08 +00001441 def test_detach(self):
1442 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1443 self.assertRaises(self.UnsupportedOperation, pair.detach)
1444
1445 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001446 with support.check_warnings(("max_buffer_size is deprecated",
1447 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001448 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001449
1450 def test_constructor_with_not_readable(self):
1451 class NotReadable(MockRawIO):
1452 def readable(self):
1453 return False
1454
1455 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1456
1457 def test_constructor_with_not_writeable(self):
1458 class NotWriteable(MockRawIO):
1459 def writable(self):
1460 return False
1461
1462 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1463
1464 def test_read(self):
1465 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1466
1467 self.assertEqual(pair.read(3), b"abc")
1468 self.assertEqual(pair.read(1), b"d")
1469 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001470 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1471 self.assertEqual(pair.read(None), b"abc")
1472
1473 def test_readlines(self):
1474 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1475 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1476 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1477 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001478
1479 def test_read1(self):
1480 # .read1() is delegated to the underlying reader object, so this test
1481 # can be shallow.
1482 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1483
1484 self.assertEqual(pair.read1(3), b"abc")
1485
1486 def test_readinto(self):
1487 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1488
Martin Panterc9813d82016-06-03 05:59:20 +00001489 data = byteslike(5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001490 self.assertEqual(pair.readinto(data), 5)
Martin Panterc9813d82016-06-03 05:59:20 +00001491 self.assertEqual(data.tobytes(), b"abcde")
Antoine Pitrou19690592009-06-12 20:14:08 +00001492
1493 def test_write(self):
1494 w = self.MockRawIO()
1495 pair = self.tp(self.MockRawIO(), w)
1496
1497 pair.write(b"abc")
1498 pair.flush()
Martin Panterc9813d82016-06-03 05:59:20 +00001499 buffer = bytearray(b"def")
1500 pair.write(buffer)
1501 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitrou19690592009-06-12 20:14:08 +00001502 pair.flush()
1503 self.assertEqual(w._write_stack, [b"abc", b"def"])
1504
1505 def test_peek(self):
1506 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1507
1508 self.assertTrue(pair.peek(3).startswith(b"abc"))
1509 self.assertEqual(pair.read(3), b"abc")
1510
1511 def test_readable(self):
1512 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1513 self.assertTrue(pair.readable())
1514
1515 def test_writeable(self):
1516 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1517 self.assertTrue(pair.writable())
1518
1519 def test_seekable(self):
1520 # BufferedRWPairs are never seekable, even if their readers and writers
1521 # are.
1522 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1523 self.assertFalse(pair.seekable())
1524
1525 # .flush() is delegated to the underlying writer object and has been
1526 # tested in the test_write method.
1527
1528 def test_close_and_closed(self):
1529 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1530 self.assertFalse(pair.closed)
1531 pair.close()
1532 self.assertTrue(pair.closed)
1533
Serhiy Storchakaf95a57f2015-03-24 23:23:42 +02001534 def test_reader_close_error_on_close(self):
1535 def reader_close():
1536 reader_non_existing
1537 reader = self.MockRawIO()
1538 reader.close = reader_close
1539 writer = self.MockRawIO()
1540 pair = self.tp(reader, writer)
1541 with self.assertRaises(NameError) as err:
1542 pair.close()
1543 self.assertIn('reader_non_existing', str(err.exception))
1544 self.assertTrue(pair.closed)
1545 self.assertFalse(reader.closed)
1546 self.assertTrue(writer.closed)
1547
1548 def test_writer_close_error_on_close(self):
1549 def writer_close():
1550 writer_non_existing
1551 reader = self.MockRawIO()
1552 writer = self.MockRawIO()
1553 writer.close = writer_close
1554 pair = self.tp(reader, writer)
1555 with self.assertRaises(NameError) as err:
1556 pair.close()
1557 self.assertIn('writer_non_existing', str(err.exception))
1558 self.assertFalse(pair.closed)
1559 self.assertTrue(reader.closed)
1560 self.assertFalse(writer.closed)
1561
1562 def test_reader_writer_close_error_on_close(self):
1563 def reader_close():
1564 reader_non_existing
1565 def writer_close():
1566 writer_non_existing
1567 reader = self.MockRawIO()
1568 reader.close = reader_close
1569 writer = self.MockRawIO()
1570 writer.close = writer_close
1571 pair = self.tp(reader, writer)
1572 with self.assertRaises(NameError) as err:
1573 pair.close()
1574 self.assertIn('reader_non_existing', str(err.exception))
1575 self.assertFalse(pair.closed)
1576 self.assertFalse(reader.closed)
1577 self.assertFalse(writer.closed)
1578
Antoine Pitrou19690592009-06-12 20:14:08 +00001579 def test_isatty(self):
1580 class SelectableIsAtty(MockRawIO):
1581 def __init__(self, isatty):
1582 MockRawIO.__init__(self)
1583 self._isatty = isatty
1584
1585 def isatty(self):
1586 return self._isatty
1587
1588 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1589 self.assertFalse(pair.isatty())
1590
1591 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1592 self.assertTrue(pair.isatty())
1593
1594 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1595 self.assertTrue(pair.isatty())
1596
1597 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1598 self.assertTrue(pair.isatty())
1599
Benjamin Peterson1c873bf2014-09-29 22:46:57 -04001600 def test_weakref_clearing(self):
1601 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1602 ref = weakref.ref(brw)
1603 brw = None
1604 ref = None # Shouldn't segfault.
1605
Antoine Pitrou19690592009-06-12 20:14:08 +00001606class CBufferedRWPairTest(BufferedRWPairTest):
1607 tp = io.BufferedRWPair
1608
1609class PyBufferedRWPairTest(BufferedRWPairTest):
1610 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001611
1612
Antoine Pitrou19690592009-06-12 20:14:08 +00001613class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1614 read_mode = "rb+"
1615 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001616
Antoine Pitrou19690592009-06-12 20:14:08 +00001617 def test_constructor(self):
1618 BufferedReaderTest.test_constructor(self)
1619 BufferedWriterTest.test_constructor(self)
1620
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001621 def test_uninitialized(self):
1622 BufferedReaderTest.test_uninitialized(self)
1623 BufferedWriterTest.test_uninitialized(self)
1624
Antoine Pitrou19690592009-06-12 20:14:08 +00001625 def test_read_and_write(self):
1626 raw = self.MockRawIO((b"asdf", b"ghjk"))
1627 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001628
1629 self.assertEqual(b"as", rw.read(2))
1630 rw.write(b"ddd")
1631 rw.write(b"eee")
1632 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001633 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001634 self.assertEqual(b"dddeee", raw._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001635
Antoine Pitrou19690592009-06-12 20:14:08 +00001636 def test_seek_and_tell(self):
1637 raw = self.BytesIO(b"asdfghjkl")
1638 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001639
Ezio Melotti2623a372010-11-21 13:34:58 +00001640 self.assertEqual(b"as", rw.read(2))
1641 self.assertEqual(2, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001642 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001643 self.assertEqual(b"asdf", rw.read(4))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001644
Antoine Pitrou808cec52011-08-20 15:40:58 +02001645 rw.write(b"123f")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001646 rw.seek(0, 0)
Antoine Pitrou808cec52011-08-20 15:40:58 +02001647 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001648 self.assertEqual(9, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001649 rw.seek(-4, 2)
Ezio Melotti2623a372010-11-21 13:34:58 +00001650 self.assertEqual(5, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001651 rw.seek(2, 1)
Ezio Melotti2623a372010-11-21 13:34:58 +00001652 self.assertEqual(7, rw.tell())
1653 self.assertEqual(b"fl", rw.read(11))
Antoine Pitrou808cec52011-08-20 15:40:58 +02001654 rw.flush()
1655 self.assertEqual(b"asdf123fl", raw.getvalue())
1656
Christian Heimes1a6387e2008-03-26 12:49:49 +00001657 self.assertRaises(TypeError, rw.seek, 0.0)
1658
Antoine Pitrou19690592009-06-12 20:14:08 +00001659 def check_flush_and_read(self, read_func):
1660 raw = self.BytesIO(b"abcdefghi")
1661 bufio = self.tp(raw)
1662
Ezio Melotti2623a372010-11-21 13:34:58 +00001663 self.assertEqual(b"ab", read_func(bufio, 2))
Antoine Pitrou19690592009-06-12 20:14:08 +00001664 bufio.write(b"12")
Ezio Melotti2623a372010-11-21 13:34:58 +00001665 self.assertEqual(b"ef", read_func(bufio, 2))
1666 self.assertEqual(6, bufio.tell())
Antoine Pitrou19690592009-06-12 20:14:08 +00001667 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001668 self.assertEqual(6, bufio.tell())
1669 self.assertEqual(b"ghi", read_func(bufio))
Antoine Pitrou19690592009-06-12 20:14:08 +00001670 raw.seek(0, 0)
1671 raw.write(b"XYZ")
1672 # flush() resets the read buffer
1673 bufio.flush()
1674 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001675 self.assertEqual(b"XYZ", read_func(bufio, 3))
Antoine Pitrou19690592009-06-12 20:14:08 +00001676
1677 def test_flush_and_read(self):
1678 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1679
1680 def test_flush_and_readinto(self):
1681 def _readinto(bufio, n=-1):
1682 b = bytearray(n if n >= 0 else 9999)
1683 n = bufio.readinto(b)
1684 return bytes(b[:n])
1685 self.check_flush_and_read(_readinto)
1686
1687 def test_flush_and_peek(self):
1688 def _peek(bufio, n=-1):
1689 # This relies on the fact that the buffer can contain the whole
1690 # raw stream, otherwise peek() can return less.
1691 b = bufio.peek(n)
1692 if n != -1:
1693 b = b[:n]
1694 bufio.seek(len(b), 1)
1695 return b
1696 self.check_flush_and_read(_peek)
1697
1698 def test_flush_and_write(self):
1699 raw = self.BytesIO(b"abcdefghi")
1700 bufio = self.tp(raw)
1701
1702 bufio.write(b"123")
1703 bufio.flush()
1704 bufio.write(b"45")
1705 bufio.flush()
1706 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001707 self.assertEqual(b"12345fghi", raw.getvalue())
1708 self.assertEqual(b"12345fghi", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +00001709
1710 def test_threads(self):
1711 BufferedReaderTest.test_threads(self)
1712 BufferedWriterTest.test_threads(self)
1713
1714 def test_writes_and_peek(self):
1715 def _peek(bufio):
1716 bufio.peek(1)
1717 self.check_writes(_peek)
1718 def _peek(bufio):
1719 pos = bufio.tell()
1720 bufio.seek(-1, 1)
1721 bufio.peek(1)
1722 bufio.seek(pos, 0)
1723 self.check_writes(_peek)
1724
1725 def test_writes_and_reads(self):
1726 def _read(bufio):
1727 bufio.seek(-1, 1)
1728 bufio.read(1)
1729 self.check_writes(_read)
1730
1731 def test_writes_and_read1s(self):
1732 def _read1(bufio):
1733 bufio.seek(-1, 1)
1734 bufio.read1(1)
1735 self.check_writes(_read1)
1736
1737 def test_writes_and_readintos(self):
1738 def _read(bufio):
1739 bufio.seek(-1, 1)
1740 bufio.readinto(bytearray(1))
1741 self.check_writes(_read)
1742
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001743 def test_write_after_readahead(self):
1744 # Issue #6629: writing after the buffer was filled by readahead should
1745 # first rewind the raw stream.
1746 for overwrite_size in [1, 5]:
1747 raw = self.BytesIO(b"A" * 10)
1748 bufio = self.tp(raw, 4)
1749 # Trigger readahead
1750 self.assertEqual(bufio.read(1), b"A")
1751 self.assertEqual(bufio.tell(), 1)
1752 # Overwriting should rewind the raw stream if it needs so
1753 bufio.write(b"B" * overwrite_size)
1754 self.assertEqual(bufio.tell(), overwrite_size + 1)
1755 # If the write size was smaller than the buffer size, flush() and
1756 # check that rewind happens.
1757 bufio.flush()
1758 self.assertEqual(bufio.tell(), overwrite_size + 1)
1759 s = raw.getvalue()
1760 self.assertEqual(s,
1761 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1762
Antoine Pitrouee46a7b2011-05-13 00:31:52 +02001763 def test_write_rewind_write(self):
1764 # Various combinations of reading / writing / seeking backwards / writing again
1765 def mutate(bufio, pos1, pos2):
1766 assert pos2 >= pos1
1767 # Fill the buffer
1768 bufio.seek(pos1)
1769 bufio.read(pos2 - pos1)
1770 bufio.write(b'\x02')
1771 # This writes earlier than the previous write, but still inside
1772 # the buffer.
1773 bufio.seek(pos1)
1774 bufio.write(b'\x01')
1775
1776 b = b"\x80\x81\x82\x83\x84"
1777 for i in range(0, len(b)):
1778 for j in range(i, len(b)):
1779 raw = self.BytesIO(b)
1780 bufio = self.tp(raw, 100)
1781 mutate(bufio, i, j)
1782 bufio.flush()
1783 expected = bytearray(b)
1784 expected[j] = 2
1785 expected[i] = 1
1786 self.assertEqual(raw.getvalue(), expected,
1787 "failed result for i=%d, j=%d" % (i, j))
1788
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001789 def test_truncate_after_read_or_write(self):
1790 raw = self.BytesIO(b"A" * 10)
1791 bufio = self.tp(raw, 100)
1792 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1793 self.assertEqual(bufio.truncate(), 2)
1794 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1795 self.assertEqual(bufio.truncate(), 4)
1796
Antoine Pitrou19690592009-06-12 20:14:08 +00001797 def test_misbehaved_io(self):
1798 BufferedReaderTest.test_misbehaved_io(self)
1799 BufferedWriterTest.test_misbehaved_io(self)
1800
Antoine Pitrou808cec52011-08-20 15:40:58 +02001801 def test_interleaved_read_write(self):
1802 # Test for issue #12213
1803 with self.BytesIO(b'abcdefgh') as raw:
1804 with self.tp(raw, 100) as f:
1805 f.write(b"1")
1806 self.assertEqual(f.read(1), b'b')
1807 f.write(b'2')
1808 self.assertEqual(f.read1(1), b'd')
1809 f.write(b'3')
1810 buf = bytearray(1)
1811 f.readinto(buf)
1812 self.assertEqual(buf, b'f')
1813 f.write(b'4')
1814 self.assertEqual(f.peek(1), b'h')
1815 f.flush()
1816 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1817
1818 with self.BytesIO(b'abc') as raw:
1819 with self.tp(raw, 100) as f:
1820 self.assertEqual(f.read(1), b'a')
1821 f.write(b"2")
1822 self.assertEqual(f.read(1), b'c')
1823 f.flush()
1824 self.assertEqual(raw.getvalue(), b'a2c')
1825
1826 def test_interleaved_readline_write(self):
1827 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1828 with self.tp(raw) as f:
1829 f.write(b'1')
1830 self.assertEqual(f.readline(), b'b\n')
1831 f.write(b'2')
1832 self.assertEqual(f.readline(), b'def\n')
1833 f.write(b'3')
1834 self.assertEqual(f.readline(), b'\n')
1835 f.flush()
1836 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1837
R David Murray5b2cf5e2013-02-23 22:11:21 -05001838
Antoine Pitroubff5df02012-07-29 19:02:46 +02001839class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
1840 BufferedRandomTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001841 tp = io.BufferedRandom
1842
1843 def test_constructor(self):
1844 BufferedRandomTest.test_constructor(self)
1845 # The allocation can succeed on 32-bit builds, e.g. with more
1846 # than 2GB RAM and a 64-bit kernel.
1847 if sys.maxsize > 0x7FFFFFFF:
1848 rawio = self.MockRawIO()
1849 bufio = self.tp(rawio)
1850 self.assertRaises((OverflowError, MemoryError, ValueError),
1851 bufio.__init__, rawio, sys.maxsize)
1852
1853 def test_garbage_collection(self):
1854 CBufferedReaderTest.test_garbage_collection(self)
1855 CBufferedWriterTest.test_garbage_collection(self)
1856
R David Murray5b2cf5e2013-02-23 22:11:21 -05001857 def test_args_error(self):
1858 # Issue #17275
1859 with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
1860 self.tp(io.BytesIO(), 1024, 1024, 1024)
1861
1862
Antoine Pitrou19690592009-06-12 20:14:08 +00001863class PyBufferedRandomTest(BufferedRandomTest):
1864 tp = pyio.BufferedRandom
1865
1866
Christian Heimes1a6387e2008-03-26 12:49:49 +00001867# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1868# properties:
1869# - A single output character can correspond to many bytes of input.
1870# - The number of input bytes to complete the character can be
1871# undetermined until the last input byte is received.
1872# - The number of input bytes can vary depending on previous input.
1873# - A single input byte can correspond to many characters of output.
1874# - The number of output characters can be undetermined until the
1875# last input byte is received.
1876# - The number of output characters can vary depending on previous input.
1877
1878class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1879 """
1880 For testing seek/tell behavior with a stateful, buffering decoder.
1881
1882 Input is a sequence of words. Words may be fixed-length (length set
1883 by input) or variable-length (period-terminated). In variable-length
1884 mode, extra periods are ignored. Possible words are:
1885 - 'i' followed by a number sets the input length, I (maximum 99).
1886 When I is set to 0, words are space-terminated.
1887 - 'o' followed by a number sets the output length, O (maximum 99).
1888 - Any other word is converted into a word followed by a period on
1889 the output. The output word consists of the input word truncated
1890 or padded out with hyphens to make its length equal to O. If O
1891 is 0, the word is output verbatim without truncating or padding.
1892 I and O are initially set to 1. When I changes, any buffered input is
1893 re-scanned according to the new I. EOF also terminates the last word.
1894 """
1895
1896 def __init__(self, errors='strict'):
1897 codecs.IncrementalDecoder.__init__(self, errors)
1898 self.reset()
1899
1900 def __repr__(self):
1901 return '<SID %x>' % id(self)
1902
1903 def reset(self):
1904 self.i = 1
1905 self.o = 1
1906 self.buffer = bytearray()
1907
1908 def getstate(self):
1909 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1910 return bytes(self.buffer), i*100 + o
1911
1912 def setstate(self, state):
1913 buffer, io = state
1914 self.buffer = bytearray(buffer)
1915 i, o = divmod(io, 100)
1916 self.i, self.o = i ^ 1, o ^ 1
1917
1918 def decode(self, input, final=False):
1919 output = ''
1920 for b in input:
1921 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001922 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001923 if self.buffer:
1924 output += self.process_word()
1925 else:
1926 self.buffer.append(b)
1927 else: # fixed-length, terminate after self.i bytes
1928 self.buffer.append(b)
1929 if len(self.buffer) == self.i:
1930 output += self.process_word()
1931 if final and self.buffer: # EOF terminates the last word
1932 output += self.process_word()
1933 return output
1934
1935 def process_word(self):
1936 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001937 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001938 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001939 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001940 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1941 else:
1942 output = self.buffer.decode('ascii')
1943 if len(output) < self.o:
1944 output += '-'*self.o # pad out with hyphens
1945 if self.o:
1946 output = output[:self.o] # truncate to output length
1947 output += '.'
1948 self.buffer = bytearray()
1949 return output
1950
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001951 codecEnabled = False
1952
1953 @classmethod
1954 def lookupTestDecoder(cls, name):
1955 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001956 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001957 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001958 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001959 incrementalencoder=None,
1960 streamreader=None, streamwriter=None,
1961 incrementaldecoder=cls)
1962
1963# Register the previous decoder for testing.
1964# Disabled by default, tests will enable it.
1965codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1966
1967
Christian Heimes1a6387e2008-03-26 12:49:49 +00001968class StatefulIncrementalDecoderTest(unittest.TestCase):
1969 """
1970 Make sure the StatefulIncrementalDecoder actually works.
1971 """
1972
1973 test_cases = [
1974 # I=1, O=1 (fixed-length input == fixed-length output)
1975 (b'abcd', False, 'a.b.c.d.'),
1976 # I=0, O=0 (variable-length input, variable-length output)
1977 (b'oiabcd', True, 'abcd.'),
1978 # I=0, O=0 (should ignore extra periods)
1979 (b'oi...abcd...', True, 'abcd.'),
1980 # I=0, O=6 (variable-length input, fixed-length output)
1981 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1982 # I=2, O=6 (fixed-length input < fixed-length output)
1983 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1984 # I=6, O=3 (fixed-length input > fixed-length output)
1985 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1986 # I=0, then 3; O=29, then 15 (with longer output)
1987 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1988 'a----------------------------.' +
1989 'b----------------------------.' +
1990 'cde--------------------------.' +
1991 'abcdefghijabcde.' +
1992 'a.b------------.' +
1993 '.c.------------.' +
1994 'd.e------------.' +
1995 'k--------------.' +
1996 'l--------------.' +
1997 'm--------------.')
1998 ]
1999
Antoine Pitrou19690592009-06-12 20:14:08 +00002000 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002001 # Try a few one-shot test cases.
2002 for input, eof, output in self.test_cases:
2003 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00002004 self.assertEqual(d.decode(input, eof), output)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002005
2006 # Also test an unfinished decode, followed by forcing EOF.
2007 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00002008 self.assertEqual(d.decode(b'oiabcd'), '')
2009 self.assertEqual(d.decode(b'', 1), 'abcd.')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002010
2011class TextIOWrapperTest(unittest.TestCase):
2012
2013 def setUp(self):
2014 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2015 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00002016 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002017
2018 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002019 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002020
Antoine Pitrou19690592009-06-12 20:14:08 +00002021 def test_constructor(self):
2022 r = self.BytesIO(b"\xc3\xa9\n\n")
2023 b = self.BufferedReader(r, 1000)
2024 t = self.TextIOWrapper(b)
2025 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti2623a372010-11-21 13:34:58 +00002026 self.assertEqual(t.encoding, "latin1")
2027 self.assertEqual(t.line_buffering, False)
Antoine Pitrou19690592009-06-12 20:14:08 +00002028 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti2623a372010-11-21 13:34:58 +00002029 self.assertEqual(t.encoding, "utf8")
2030 self.assertEqual(t.line_buffering, True)
2031 self.assertEqual("\xe9\n", t.readline())
Antoine Pitrou19690592009-06-12 20:14:08 +00002032 self.assertRaises(TypeError, t.__init__, b, newline=42)
2033 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2034
Serhiy Storchakaabb7e652015-04-16 11:56:35 +03002035 def test_uninitialized(self):
2036 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2037 del t
2038 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2039 self.assertRaises(Exception, repr, t)
2040 self.assertRaisesRegexp((ValueError, AttributeError),
2041 'uninitialized|has no attribute',
2042 t.read, 0)
2043 t.__init__(self.MockRawIO())
2044 self.assertEqual(t.read(0), u'')
2045
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002046 def test_non_text_encoding_codecs_are_rejected(self):
2047 # Ensure the constructor complains if passed a codec that isn't
2048 # marked as a text encoding
2049 # http://bugs.python.org/issue20404
2050 r = self.BytesIO()
2051 b = self.BufferedWriter(r)
2052 with support.check_py3k_warnings():
2053 self.TextIOWrapper(b, encoding="hex_codec")
2054
Antoine Pitrou19690592009-06-12 20:14:08 +00002055 def test_detach(self):
2056 r = self.BytesIO()
2057 b = self.BufferedWriter(r)
2058 t = self.TextIOWrapper(b)
2059 self.assertIs(t.detach(), b)
2060
2061 t = self.TextIOWrapper(b, encoding="ascii")
2062 t.write("howdy")
2063 self.assertFalse(r.getvalue())
2064 t.detach()
2065 self.assertEqual(r.getvalue(), b"howdy")
2066 self.assertRaises(ValueError, t.detach)
2067
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002068 # Operations independent of the detached stream should still work
2069 repr(t)
2070 self.assertEqual(t.encoding, "ascii")
2071 self.assertEqual(t.errors, "strict")
2072 self.assertFalse(t.line_buffering)
2073
Antoine Pitrou19690592009-06-12 20:14:08 +00002074 def test_repr(self):
2075 raw = self.BytesIO("hello".encode("utf-8"))
2076 b = self.BufferedReader(raw)
2077 t = self.TextIOWrapper(b, encoding="utf-8")
2078 modname = self.TextIOWrapper.__module__
2079 self.assertEqual(repr(t),
2080 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2081 raw.name = "dummy"
2082 self.assertEqual(repr(t),
2083 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
2084 raw.name = b"dummy"
2085 self.assertEqual(repr(t),
2086 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
2087
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002088 t.buffer.detach()
2089 repr(t) # Should not raise an exception
2090
Antoine Pitrou19690592009-06-12 20:14:08 +00002091 def test_line_buffering(self):
2092 r = self.BytesIO()
2093 b = self.BufferedWriter(r, 1000)
2094 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
2095 t.write("X")
Ezio Melotti2623a372010-11-21 13:34:58 +00002096 self.assertEqual(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00002097 t.write("Y\nZ")
Ezio Melotti2623a372010-11-21 13:34:58 +00002098 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00002099 t.write("A\rB")
Ezio Melotti2623a372010-11-21 13:34:58 +00002100 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002101
Antoine Pitrou19690592009-06-12 20:14:08 +00002102 def test_encoding(self):
2103 # Check the encoding attribute is always set, and valid
2104 b = self.BytesIO()
2105 t = self.TextIOWrapper(b, encoding="utf8")
2106 self.assertEqual(t.encoding, "utf8")
2107 t = self.TextIOWrapper(b)
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03002108 self.assertIsNotNone(t.encoding)
Antoine Pitrou19690592009-06-12 20:14:08 +00002109 codecs.lookup(t.encoding)
2110
2111 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002112 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00002113 b = self.BytesIO(b"abc\n\xff\n")
2114 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002115 self.assertRaises(UnicodeError, t.read)
2116 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00002117 b = self.BytesIO(b"abc\n\xff\n")
2118 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002119 self.assertRaises(UnicodeError, t.read)
2120 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00002121 b = self.BytesIO(b"abc\n\xff\n")
2122 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti2623a372010-11-21 13:34:58 +00002123 self.assertEqual(t.read(), "abc\n\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002124 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00002125 b = self.BytesIO(b"abc\n\xff\n")
2126 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti2623a372010-11-21 13:34:58 +00002127 self.assertEqual(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002128
Antoine Pitrou19690592009-06-12 20:14:08 +00002129 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002130 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00002131 b = self.BytesIO()
2132 t = self.TextIOWrapper(b, encoding="ascii")
2133 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002134 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00002135 b = self.BytesIO()
2136 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2137 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002138 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00002139 b = self.BytesIO()
2140 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00002141 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00002142 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002143 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002144 self.assertEqual(b.getvalue(), b"abcdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002145 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00002146 b = self.BytesIO()
2147 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00002148 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00002149 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002150 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002151 self.assertEqual(b.getvalue(), b"abc?def\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002152
Antoine Pitrou19690592009-06-12 20:14:08 +00002153 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002154 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2155
2156 tests = [
2157 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
2158 [ '', input_lines ],
2159 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2160 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2161 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
2162 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002163 encodings = (
2164 'utf-8', 'latin-1',
2165 'utf-16', 'utf-16-le', 'utf-16-be',
2166 'utf-32', 'utf-32-le', 'utf-32-be',
2167 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00002168
2169 # Try a range of buffer sizes to test the case where \r is the last
2170 # character in TextIOWrapper._pending_line.
2171 for encoding in encodings:
2172 # XXX: str.encode() should return bytes
2173 data = bytes(''.join(input_lines).encode(encoding))
2174 for do_reads in (False, True):
2175 for bufsize in range(1, 10):
2176 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002177 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2178 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00002179 encoding=encoding)
2180 if do_reads:
2181 got_lines = []
2182 while True:
2183 c2 = textio.read(2)
2184 if c2 == '':
2185 break
Ezio Melotti2623a372010-11-21 13:34:58 +00002186 self.assertEqual(len(c2), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002187 got_lines.append(c2 + textio.readline())
2188 else:
2189 got_lines = list(textio)
2190
2191 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti2623a372010-11-21 13:34:58 +00002192 self.assertEqual(got_line, exp_line)
2193 self.assertEqual(len(got_lines), len(exp_lines))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002194
Antoine Pitrou19690592009-06-12 20:14:08 +00002195 def test_newlines_input(self):
2196 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002197 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2198 for newline, expected in [
2199 (None, normalized.decode("ascii").splitlines(True)),
2200 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00002201 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2202 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2203 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00002204 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00002205 buf = self.BytesIO(testdata)
2206 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti2623a372010-11-21 13:34:58 +00002207 self.assertEqual(txt.readlines(), expected)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002208 txt.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002209 self.assertEqual(txt.read(), "".join(expected))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002210
Antoine Pitrou19690592009-06-12 20:14:08 +00002211 def test_newlines_output(self):
2212 testdict = {
2213 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2214 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2215 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2216 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2217 }
2218 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2219 for newline, expected in tests:
2220 buf = self.BytesIO()
2221 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2222 txt.write("AAA\nB")
2223 txt.write("BB\nCCC\n")
2224 txt.write("X\rY\r\nZ")
2225 txt.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002226 self.assertEqual(buf.closed, False)
2227 self.assertEqual(buf.getvalue(), expected)
Antoine Pitrou19690592009-06-12 20:14:08 +00002228
2229 def test_destructor(self):
2230 l = []
2231 base = self.BytesIO
2232 class MyBytesIO(base):
2233 def close(self):
2234 l.append(self.getvalue())
2235 base.close(self)
2236 b = MyBytesIO()
2237 t = self.TextIOWrapper(b, encoding="ascii")
2238 t.write("abc")
2239 del t
2240 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00002241 self.assertEqual([b"abc"], l)
Antoine Pitrou19690592009-06-12 20:14:08 +00002242
2243 def test_override_destructor(self):
2244 record = []
2245 class MyTextIO(self.TextIOWrapper):
2246 def __del__(self):
2247 record.append(1)
2248 try:
2249 f = super(MyTextIO, self).__del__
2250 except AttributeError:
2251 pass
2252 else:
2253 f()
2254 def close(self):
2255 record.append(2)
2256 super(MyTextIO, self).close()
2257 def flush(self):
2258 record.append(3)
2259 super(MyTextIO, self).flush()
2260 b = self.BytesIO()
2261 t = MyTextIO(b, encoding="ascii")
2262 del t
2263 support.gc_collect()
2264 self.assertEqual(record, [1, 2, 3])
2265
2266 def test_error_through_destructor(self):
2267 # Test that the exception state is not modified by a destructor,
2268 # even if close() fails.
2269 rawio = self.CloseFailureIO()
2270 def f():
2271 self.TextIOWrapper(rawio).xyzzy
2272 with support.captured_output("stderr") as s:
2273 self.assertRaises(AttributeError, f)
2274 s = s.getvalue().strip()
2275 if s:
2276 # The destructor *may* have printed an unraisable error, check it
2277 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002278 self.assertTrue(s.startswith("Exception IOError: "), s)
2279 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002280
2281 # Systematic tests of the text I/O API
2282
Antoine Pitrou19690592009-06-12 20:14:08 +00002283 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002284 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2285 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00002286 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002287 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002288 self.assertEqual(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002289 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002290 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002291 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002292 self.assertEqual(f.tell(), 0)
2293 self.assertEqual(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002294 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002295 self.assertEqual(f.seek(0), 0)
2296 self.assertEqual(f.read(None), "abc")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002297 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002298 self.assertEqual(f.read(2), "ab")
2299 self.assertEqual(f.read(1), "c")
2300 self.assertEqual(f.read(1), "")
2301 self.assertEqual(f.read(), "")
2302 self.assertEqual(f.tell(), cookie)
2303 self.assertEqual(f.seek(0), 0)
2304 self.assertEqual(f.seek(0, 2), cookie)
2305 self.assertEqual(f.write("def"), 3)
2306 self.assertEqual(f.seek(cookie), cookie)
2307 self.assertEqual(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002308 if enc.startswith("utf"):
2309 self.multi_line_test(f, enc)
2310 f.close()
2311
2312 def multi_line_test(self, f, enc):
2313 f.seek(0)
2314 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00002315 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002316 wlines = []
2317 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2318 chars = []
2319 for i in range(size):
2320 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00002321 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002322 wlines.append((f.tell(), line))
2323 f.write(line)
2324 f.seek(0)
2325 rlines = []
2326 while True:
2327 pos = f.tell()
2328 line = f.readline()
2329 if not line:
2330 break
2331 rlines.append((pos, line))
Ezio Melotti2623a372010-11-21 13:34:58 +00002332 self.assertEqual(rlines, wlines)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002333
Antoine Pitrou19690592009-06-12 20:14:08 +00002334 def test_telling(self):
2335 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002336 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002337 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002338 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002339 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002340 p2 = f.tell()
2341 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002342 self.assertEqual(f.tell(), p0)
2343 self.assertEqual(f.readline(), "\xff\n")
2344 self.assertEqual(f.tell(), p1)
2345 self.assertEqual(f.readline(), "\xff\n")
2346 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002347 f.seek(0)
2348 for line in f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002349 self.assertEqual(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002350 self.assertRaises(IOError, f.tell)
Ezio Melotti2623a372010-11-21 13:34:58 +00002351 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002352 f.close()
2353
Antoine Pitrou19690592009-06-12 20:14:08 +00002354 def test_seeking(self):
2355 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00002356 prefix_size = chunk_size - 2
2357 u_prefix = "a" * prefix_size
2358 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti2623a372010-11-21 13:34:58 +00002359 self.assertEqual(len(u_prefix), len(prefix))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002360 u_suffix = "\u8888\n"
2361 suffix = bytes(u_suffix.encode("utf-8"))
2362 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00002363 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002364 f.write(line*2)
2365 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002366 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002367 s = f.read(prefix_size)
Ezio Melotti2623a372010-11-21 13:34:58 +00002368 self.assertEqual(s, prefix.decode("ascii"))
2369 self.assertEqual(f.tell(), prefix_size)
2370 self.assertEqual(f.readline(), u_suffix)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002371
Antoine Pitrou19690592009-06-12 20:14:08 +00002372 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002373 # Regression test for a specific bug
2374 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00002375 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002376 f.write(data)
2377 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002378 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002379 f._CHUNK_SIZE # Just test that it exists
2380 f._CHUNK_SIZE = 2
2381 f.readline()
2382 f.tell()
2383
Antoine Pitrou19690592009-06-12 20:14:08 +00002384 def test_seek_and_tell(self):
2385 #Test seek/tell using the StatefulIncrementalDecoder.
2386 # Make test faster by doing smaller seeks
2387 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00002388
Antoine Pitrou19690592009-06-12 20:14:08 +00002389 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002390 """Tell/seek to various points within a data stream and ensure
2391 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00002392 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002393 f.write(data)
2394 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002395 f = self.open(support.TESTFN, encoding='test_decoder')
2396 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00002397 decoded = f.read()
2398 f.close()
2399
2400 for i in range(min_pos, len(decoded) + 1): # seek positions
2401 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00002402 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti2623a372010-11-21 13:34:58 +00002403 self.assertEqual(f.read(i), decoded[:i])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002404 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002405 self.assertEqual(f.read(j), decoded[i:i + j])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002406 f.seek(cookie)
Ezio Melotti2623a372010-11-21 13:34:58 +00002407 self.assertEqual(f.read(), decoded[i:])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002408 f.close()
2409
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002410 # Enable the test decoder.
2411 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002412
2413 # Run the tests.
2414 try:
2415 # Try each test case.
2416 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002417 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002418
2419 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002420 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2421 offset = CHUNK_SIZE - len(input)//2
2422 prefix = b'.'*offset
2423 # Don't bother seeking into the prefix (takes too long).
2424 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002425 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002426
2427 # Ensure our test decoder won't interfere with subsequent tests.
2428 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002429 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002430
Antoine Pitrou19690592009-06-12 20:14:08 +00002431 def test_encoded_writes(self):
2432 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002433 tests = ("utf-16",
2434 "utf-16-le",
2435 "utf-16-be",
2436 "utf-32",
2437 "utf-32-le",
2438 "utf-32-be")
2439 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002440 buf = self.BytesIO()
2441 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002442 # Check if the BOM is written only once (see issue1753).
2443 f.write(data)
2444 f.write(data)
2445 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002446 self.assertEqual(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002447 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002448 self.assertEqual(f.read(), data * 2)
2449 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002450
Antoine Pitrou19690592009-06-12 20:14:08 +00002451 def test_unreadable(self):
2452 class UnReadable(self.BytesIO):
2453 def readable(self):
2454 return False
2455 txt = self.TextIOWrapper(UnReadable())
2456 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002457
Antoine Pitrou19690592009-06-12 20:14:08 +00002458 def test_read_one_by_one(self):
2459 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002460 reads = ""
2461 while True:
2462 c = txt.read(1)
2463 if not c:
2464 break
2465 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002466 self.assertEqual(reads, "AA\nBB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002467
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002468 def test_readlines(self):
2469 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2470 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2471 txt.seek(0)
2472 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2473 txt.seek(0)
2474 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2475
Christian Heimes1a6387e2008-03-26 12:49:49 +00002476 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002477 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002478 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002479 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002480 reads = ""
2481 while True:
2482 c = txt.read(128)
2483 if not c:
2484 break
2485 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002486 self.assertEqual(reads, "A"*127+"\nB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002487
Antoine Pitroueadca1d2012-10-16 23:02:27 +02002488 def test_writelines(self):
2489 l = ['ab', 'cd', 'ef']
2490 buf = self.BytesIO()
2491 txt = self.TextIOWrapper(buf)
2492 txt.writelines(l)
2493 txt.flush()
2494 self.assertEqual(buf.getvalue(), b'abcdef')
2495
2496 def test_writelines_userlist(self):
2497 l = UserList(['ab', 'cd', 'ef'])
2498 buf = self.BytesIO()
2499 txt = self.TextIOWrapper(buf)
2500 txt.writelines(l)
2501 txt.flush()
2502 self.assertEqual(buf.getvalue(), b'abcdef')
2503
2504 def test_writelines_error(self):
2505 txt = self.TextIOWrapper(self.BytesIO())
2506 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2507 self.assertRaises(TypeError, txt.writelines, None)
2508 self.assertRaises(TypeError, txt.writelines, b'abc')
2509
Christian Heimes1a6387e2008-03-26 12:49:49 +00002510 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002511 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002512
2513 # read one char at a time
2514 reads = ""
2515 while True:
2516 c = txt.read(1)
2517 if not c:
2518 break
2519 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002520 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002521
2522 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002523 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002524 txt._CHUNK_SIZE = 4
2525
2526 reads = ""
2527 while True:
2528 c = txt.read(4)
2529 if not c:
2530 break
2531 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002532 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002533
2534 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002535 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002536 txt._CHUNK_SIZE = 4
2537
2538 reads = txt.read(4)
2539 reads += txt.read(4)
2540 reads += txt.readline()
2541 reads += txt.readline()
2542 reads += txt.readline()
Ezio Melotti2623a372010-11-21 13:34:58 +00002543 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002544
2545 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002546 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002547 txt._CHUNK_SIZE = 4
2548
2549 reads = txt.read(4)
2550 reads += txt.read()
Ezio Melotti2623a372010-11-21 13:34:58 +00002551 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002552
2553 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002554 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002555 txt._CHUNK_SIZE = 4
2556
2557 reads = txt.read(4)
2558 pos = txt.tell()
2559 txt.seek(0)
2560 txt.seek(pos)
Ezio Melotti2623a372010-11-21 13:34:58 +00002561 self.assertEqual(txt.read(4), "BBB\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002562
2563 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002564 buffer = self.BytesIO(self.testdata)
2565 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002566
2567 self.assertEqual(buffer.seekable(), txt.seekable())
2568
Antoine Pitrou19690592009-06-12 20:14:08 +00002569 def test_append_bom(self):
2570 # The BOM is not written again when appending to a non-empty file
2571 filename = support.TESTFN
2572 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2573 with self.open(filename, 'w', encoding=charset) as f:
2574 f.write('aaa')
2575 pos = f.tell()
2576 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002577 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002578
2579 with self.open(filename, 'a', encoding=charset) as f:
2580 f.write('xxx')
2581 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002582 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002583
Antoine Pitrou19690592009-06-12 20:14:08 +00002584 def test_seek_bom(self):
2585 # Same test, but when seeking manually
2586 filename = support.TESTFN
2587 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2588 with self.open(filename, 'w', encoding=charset) as f:
2589 f.write('aaa')
2590 pos = f.tell()
2591 with self.open(filename, 'r+', encoding=charset) as f:
2592 f.seek(pos)
2593 f.write('zzz')
2594 f.seek(0)
2595 f.write('bbb')
2596 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002597 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002598
2599 def test_errors_property(self):
2600 with self.open(support.TESTFN, "w") as f:
2601 self.assertEqual(f.errors, "strict")
2602 with self.open(support.TESTFN, "w", errors="replace") as f:
2603 self.assertEqual(f.errors, "replace")
2604
Victor Stinner6a102812010-04-27 23:55:59 +00002605 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002606 def test_threads_write(self):
2607 # Issue6750: concurrent writes could duplicate data
2608 event = threading.Event()
2609 with self.open(support.TESTFN, "w", buffering=1) as f:
2610 def run(n):
2611 text = "Thread%03d\n" % n
2612 event.wait()
2613 f.write(text)
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03002614 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002615 for x in range(20)]
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03002616 with support.start_threads(threads, event.set):
2617 time.sleep(0.02)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002618 with self.open(support.TESTFN) as f:
2619 content = f.read()
2620 for n in range(20):
Ezio Melotti2623a372010-11-21 13:34:58 +00002621 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002622
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002623 def test_flush_error_on_close(self):
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002624 # Test that text file is closed despite failed flush
2625 # and that flush() is called before file closed.
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002626 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002627 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002628 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002629 closed[:] = [txt.closed, txt.buffer.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002630 raise IOError()
2631 txt.flush = bad_flush
2632 self.assertRaises(IOError, txt.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -06002633 self.assertTrue(txt.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002634 self.assertTrue(txt.buffer.closed)
2635 self.assertTrue(closed) # flush() called
2636 self.assertFalse(closed[0]) # flush() called before file closed
2637 self.assertFalse(closed[1])
Serhiy Storchaka437d5352015-02-23 00:28:38 +02002638 txt.flush = lambda: None # break reference loop
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002639
2640 def test_multi_close(self):
2641 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2642 txt.close()
2643 txt.close()
2644 txt.close()
2645 self.assertRaises(ValueError, txt.flush)
2646
Antoine Pitroufc9ead62010-12-21 21:26:55 +00002647 def test_readonly_attributes(self):
2648 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2649 buf = self.BytesIO(self.testdata)
2650 with self.assertRaises((AttributeError, TypeError)):
2651 txt.buffer = buf
2652
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002653 def test_read_nonbytes(self):
2654 # Issue #17106
2655 # Crash when underlying read() returns non-bytes
2656 class NonbytesStream(self.StringIO):
2657 read1 = self.StringIO.read
2658 class NonbytesStream(self.StringIO):
2659 read1 = self.StringIO.read
2660 t = self.TextIOWrapper(NonbytesStream('a'))
2661 with self.maybeRaises(TypeError):
2662 t.read(1)
2663 t = self.TextIOWrapper(NonbytesStream('a'))
2664 with self.maybeRaises(TypeError):
2665 t.readline()
2666 t = self.TextIOWrapper(NonbytesStream('a'))
2667 self.assertEqual(t.read(), u'a')
2668
Oren Milman30537692017-11-07 02:17:54 +02002669 def test_illegal_encoder(self):
2670 # bpo-31271: A TypeError should be raised in case the return value of
2671 # encoder's encode() is invalid.
2672 class BadEncoder:
2673 def encode(self, dummy):
2674 return u'spam'
2675 def get_bad_encoder(dummy):
2676 return BadEncoder()
2677 rot13 = codecs.lookup("rot13")
2678 with support.swap_attr(rot13, '_is_text_encoding', True), \
2679 support.swap_attr(rot13, 'incrementalencoder', get_bad_encoder):
2680 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
2681 with self.assertRaises(TypeError):
2682 t.write('bar')
2683 t.flush()
2684
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002685 def test_illegal_decoder(self):
2686 # Issue #17106
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002687 # Bypass the early encoding check added in issue 20404
2688 def _make_illegal_wrapper():
2689 quopri = codecs.lookup("quopri_codec")
2690 quopri._is_text_encoding = True
2691 try:
2692 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
2693 newline='\n', encoding="quopri_codec")
2694 finally:
2695 quopri._is_text_encoding = False
2696 return t
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002697 # Crash when decoder returns non-string
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002698 with support.check_py3k_warnings():
2699 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2700 encoding='quopri_codec')
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002701 with self.maybeRaises(TypeError):
2702 t.read(1)
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002703 with support.check_py3k_warnings():
2704 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2705 encoding='quopri_codec')
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002706 with self.maybeRaises(TypeError):
2707 t.readline()
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002708 with support.check_py3k_warnings():
2709 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2710 encoding='quopri_codec')
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002711 with self.maybeRaises(TypeError):
2712 t.read()
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002713 #else:
2714 #t = _make_illegal_wrapper()
2715 #self.assertRaises(TypeError, t.read, 1)
2716 #t = _make_illegal_wrapper()
2717 #self.assertRaises(TypeError, t.readline)
2718 #t = _make_illegal_wrapper()
2719 #self.assertRaises(TypeError, t.read)
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002720
Oren Milman20958e62017-08-29 19:16:12 +03002721 # Issue 31243: calling read() while the return value of decoder's
2722 # getstate() is invalid should neither crash the interpreter nor
2723 # raise a SystemError.
2724 def _make_very_illegal_wrapper(getstate_ret_val):
2725 class BadDecoder:
2726 def getstate(self):
2727 return getstate_ret_val
2728 def _get_bad_decoder(dummy):
2729 return BadDecoder()
2730 quopri = codecs.lookup("quopri_codec")
2731 with support.swap_attr(quopri, 'incrementaldecoder',
2732 _get_bad_decoder):
2733 return _make_illegal_wrapper()
2734 t = _make_very_illegal_wrapper(42)
2735 with self.maybeRaises(TypeError):
2736 t.read(42)
2737 t = _make_very_illegal_wrapper(())
2738 with self.maybeRaises(TypeError):
2739 t.read(42)
2740
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002741
Antoine Pitrou19690592009-06-12 20:14:08 +00002742class CTextIOWrapperTest(TextIOWrapperTest):
2743
2744 def test_initialization(self):
2745 r = self.BytesIO(b"\xc3\xa9\n\n")
2746 b = self.BufferedReader(r, 1000)
2747 t = self.TextIOWrapper(b)
2748 self.assertRaises(TypeError, t.__init__, b, newline=42)
2749 self.assertRaises(ValueError, t.read)
2750 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2751 self.assertRaises(ValueError, t.read)
2752
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002753 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2754 self.assertRaises(Exception, repr, t)
2755
Antoine Pitrou19690592009-06-12 20:14:08 +00002756 def test_garbage_collection(self):
2757 # C TextIOWrapper objects are collected, and collecting them flushes
2758 # all data to disk.
2759 # The Python version has __del__, so it ends in gc.garbage instead.
2760 rawio = io.FileIO(support.TESTFN, "wb")
2761 b = self.BufferedWriter(rawio)
2762 t = self.TextIOWrapper(b, encoding="ascii")
2763 t.write("456def")
2764 t.x = t
2765 wr = weakref.ref(t)
2766 del t
2767 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03002768 self.assertIsNone(wr(), wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002769 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002770 self.assertEqual(f.read(), b"456def")
2771
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002772 def test_rwpair_cleared_before_textio(self):
2773 # Issue 13070: TextIOWrapper's finalization would crash when called
2774 # after the reference to the underlying BufferedRWPair's writer got
2775 # cleared by the GC.
2776 for i in range(1000):
2777 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2778 t1 = self.TextIOWrapper(b1, encoding="ascii")
2779 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2780 t2 = self.TextIOWrapper(b2, encoding="ascii")
2781 # circular references
2782 t1.buddy = t2
2783 t2.buddy = t1
2784 support.gc_collect()
2785
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002786 maybeRaises = unittest.TestCase.assertRaises
2787
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002788
Antoine Pitrou19690592009-06-12 20:14:08 +00002789class PyTextIOWrapperTest(TextIOWrapperTest):
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002790 @contextlib.contextmanager
2791 def maybeRaises(self, *args, **kwds):
2792 yield
Antoine Pitrou19690592009-06-12 20:14:08 +00002793
2794
2795class IncrementalNewlineDecoderTest(unittest.TestCase):
2796
2797 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002798 # UTF-8 specific tests for a newline decoder
2799 def _check_decode(b, s, **kwargs):
2800 # We exercise getstate() / setstate() as well as decode()
2801 state = decoder.getstate()
Ezio Melotti2623a372010-11-21 13:34:58 +00002802 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002803 decoder.setstate(state)
Ezio Melotti2623a372010-11-21 13:34:58 +00002804 self.assertEqual(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002805
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002806 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002807
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002808 _check_decode(b'\xe8', "")
2809 _check_decode(b'\xa2', "")
2810 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002811
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002812 _check_decode(b'\xe8', "")
2813 _check_decode(b'\xa2', "")
2814 _check_decode(b'\x88', "\u8888")
2815
2816 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002817 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2818
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002819 decoder.reset()
2820 _check_decode(b'\n', "\n")
2821 _check_decode(b'\r', "")
2822 _check_decode(b'', "\n", final=True)
2823 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002824
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002825 _check_decode(b'\r', "")
2826 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002827
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002828 _check_decode(b'\r\r\n', "\n\n")
2829 _check_decode(b'\r', "")
2830 _check_decode(b'\r', "\n")
2831 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002832
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002833 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2834 _check_decode(b'\xe8\xa2\x88', "\u8888")
2835 _check_decode(b'\n', "\n")
2836 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2837 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002838
Antoine Pitrou19690592009-06-12 20:14:08 +00002839 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002840 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002841 if encoding is not None:
2842 encoder = codecs.getincrementalencoder(encoding)()
2843 def _decode_bytewise(s):
2844 # Decode one byte at a time
2845 for b in encoder.encode(s):
2846 result.append(decoder.decode(b))
2847 else:
2848 encoder = None
2849 def _decode_bytewise(s):
2850 # Decode one char at a time
2851 for c in s:
2852 result.append(decoder.decode(c))
Ezio Melotti2623a372010-11-21 13:34:58 +00002853 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002854 _decode_bytewise("abc\n\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002855 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002856 _decode_bytewise("\nabc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002857 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002858 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002859 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002860 _decode_bytewise("abc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002861 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002862 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002863 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002864 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002865 input = "abc"
2866 if encoder is not None:
2867 encoder.reset()
2868 input = encoder.encode(input)
Ezio Melotti2623a372010-11-21 13:34:58 +00002869 self.assertEqual(decoder.decode(input), "abc")
2870 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002871
2872 def test_newline_decoder(self):
2873 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002874 # None meaning the IncrementalNewlineDecoder takes unicode input
2875 # rather than bytes input
2876 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002877 'utf-16', 'utf-16-le', 'utf-16-be',
2878 'utf-32', 'utf-32-le', 'utf-32-be',
2879 )
2880 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002881 decoder = enc and codecs.getincrementaldecoder(enc)()
2882 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2883 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002884 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002885 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2886 self.check_newline_decoding_utf8(decoder)
2887
2888 def test_newline_bytes(self):
2889 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2890 def _check(dec):
Ezio Melotti2623a372010-11-21 13:34:58 +00002891 self.assertEqual(dec.newlines, None)
2892 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2893 self.assertEqual(dec.newlines, None)
2894 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2895 self.assertEqual(dec.newlines, None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002896 dec = self.IncrementalNewlineDecoder(None, translate=False)
2897 _check(dec)
2898 dec = self.IncrementalNewlineDecoder(None, translate=True)
2899 _check(dec)
2900
2901class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2902 pass
2903
2904class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2905 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002906
Christian Heimes1a6387e2008-03-26 12:49:49 +00002907
2908# XXX Tests for open()
2909
2910class MiscIOTest(unittest.TestCase):
2911
Benjamin Petersonad100c32008-11-20 22:06:22 +00002912 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002913 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002914
Antoine Pitrou19690592009-06-12 20:14:08 +00002915 def test___all__(self):
2916 for name in self.io.__all__:
2917 obj = getattr(self.io, name, None)
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03002918 self.assertIsNotNone(obj, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002919 if name == "open":
2920 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002921 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002922 self.assertTrue(issubclass(obj, Exception), name)
2923 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002924 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002925
Benjamin Petersonad100c32008-11-20 22:06:22 +00002926 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002927 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002928 self.assertEqual(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002929 f.close()
2930
Antoine Pitrou19690592009-06-12 20:14:08 +00002931 f = self.open(support.TESTFN, "U")
Ezio Melotti2623a372010-11-21 13:34:58 +00002932 self.assertEqual(f.name, support.TESTFN)
2933 self.assertEqual(f.buffer.name, support.TESTFN)
2934 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2935 self.assertEqual(f.mode, "U")
2936 self.assertEqual(f.buffer.mode, "rb")
2937 self.assertEqual(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002938 f.close()
2939
Antoine Pitrou19690592009-06-12 20:14:08 +00002940 f = self.open(support.TESTFN, "w+")
Ezio Melotti2623a372010-11-21 13:34:58 +00002941 self.assertEqual(f.mode, "w+")
2942 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2943 self.assertEqual(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002944
Antoine Pitrou19690592009-06-12 20:14:08 +00002945 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti2623a372010-11-21 13:34:58 +00002946 self.assertEqual(g.mode, "wb")
2947 self.assertEqual(g.raw.mode, "wb")
2948 self.assertEqual(g.name, f.fileno())
2949 self.assertEqual(g.raw.name, f.fileno())
Benjamin Petersonad100c32008-11-20 22:06:22 +00002950 f.close()
2951 g.close()
2952
Antoine Pitrou19690592009-06-12 20:14:08 +00002953 def test_io_after_close(self):
2954 for kwargs in [
2955 {"mode": "w"},
2956 {"mode": "wb"},
2957 {"mode": "w", "buffering": 1},
2958 {"mode": "w", "buffering": 2},
2959 {"mode": "wb", "buffering": 0},
2960 {"mode": "r"},
2961 {"mode": "rb"},
2962 {"mode": "r", "buffering": 1},
2963 {"mode": "r", "buffering": 2},
2964 {"mode": "rb", "buffering": 0},
2965 {"mode": "w+"},
2966 {"mode": "w+b"},
2967 {"mode": "w+", "buffering": 1},
2968 {"mode": "w+", "buffering": 2},
2969 {"mode": "w+b", "buffering": 0},
2970 ]:
2971 f = self.open(support.TESTFN, **kwargs)
2972 f.close()
2973 self.assertRaises(ValueError, f.flush)
2974 self.assertRaises(ValueError, f.fileno)
2975 self.assertRaises(ValueError, f.isatty)
2976 self.assertRaises(ValueError, f.__iter__)
2977 if hasattr(f, "peek"):
2978 self.assertRaises(ValueError, f.peek, 1)
2979 self.assertRaises(ValueError, f.read)
2980 if hasattr(f, "read1"):
2981 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinner5100a402011-05-25 22:15:36 +02002982 if hasattr(f, "readall"):
2983 self.assertRaises(ValueError, f.readall)
Antoine Pitrou19690592009-06-12 20:14:08 +00002984 if hasattr(f, "readinto"):
2985 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2986 self.assertRaises(ValueError, f.readline)
2987 self.assertRaises(ValueError, f.readlines)
Xiang Zhang5fbdfc32017-04-15 13:18:22 +08002988 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou19690592009-06-12 20:14:08 +00002989 self.assertRaises(ValueError, f.seek, 0)
2990 self.assertRaises(ValueError, f.tell)
2991 self.assertRaises(ValueError, f.truncate)
2992 self.assertRaises(ValueError, f.write,
2993 b"" if "b" in kwargs['mode'] else "")
2994 self.assertRaises(ValueError, f.writelines, [])
2995 self.assertRaises(ValueError, next, f)
2996
2997 def test_blockingioerror(self):
2998 # Various BlockingIOError issues
2999 self.assertRaises(TypeError, self.BlockingIOError)
3000 self.assertRaises(TypeError, self.BlockingIOError, 1)
3001 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
3002 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
3003 b = self.BlockingIOError(1, "")
3004 self.assertEqual(b.characters_written, 0)
3005 class C(unicode):
3006 pass
3007 c = C("")
3008 b = self.BlockingIOError(1, c)
3009 c.b = b
3010 b.c = c
3011 wr = weakref.ref(c)
3012 del c, b
3013 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03003014 self.assertIsNone(wr(), wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00003015
3016 def test_abcs(self):
3017 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00003018 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3019 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3020 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3021 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00003022
3023 def _check_abc_inheritance(self, abcmodule):
3024 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00003025 self.assertIsInstance(f, abcmodule.IOBase)
3026 self.assertIsInstance(f, abcmodule.RawIOBase)
3027 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3028 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00003029 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00003030 self.assertIsInstance(f, abcmodule.IOBase)
3031 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3032 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3033 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00003034 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00003035 self.assertIsInstance(f, abcmodule.IOBase)
3036 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3037 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3038 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00003039
3040 def test_abc_inheritance(self):
3041 # Test implementations inherit from their respective ABCs
3042 self._check_abc_inheritance(self)
3043
3044 def test_abc_inheritance_official(self):
3045 # Test implementations inherit from the official ABCs of the
3046 # baseline "io" module.
3047 self._check_abc_inheritance(io)
3048
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01003049 @unittest.skipUnless(fcntl, 'fcntl required for this test')
3050 def test_nonblock_pipe_write_bigbuf(self):
3051 self._test_nonblock_pipe_write(16*1024)
3052
3053 @unittest.skipUnless(fcntl, 'fcntl required for this test')
3054 def test_nonblock_pipe_write_smallbuf(self):
3055 self._test_nonblock_pipe_write(1024)
3056
3057 def _set_non_blocking(self, fd):
3058 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
3059 self.assertNotEqual(flags, -1)
3060 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
3061 self.assertEqual(res, 0)
3062
3063 def _test_nonblock_pipe_write(self, bufsize):
3064 sent = []
3065 received = []
3066 r, w = os.pipe()
3067 self._set_non_blocking(r)
3068 self._set_non_blocking(w)
3069
3070 # To exercise all code paths in the C implementation we need
3071 # to play with buffer sizes. For instance, if we choose a
3072 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3073 # then we will never get a partial write of the buffer.
3074 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3075 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3076
3077 with rf, wf:
3078 for N in 9999, 73, 7574:
3079 try:
3080 i = 0
3081 while True:
3082 msg = bytes([i % 26 + 97]) * N
3083 sent.append(msg)
3084 wf.write(msg)
3085 i += 1
3086
3087 except self.BlockingIOError as e:
3088 self.assertEqual(e.args[0], errno.EAGAIN)
3089 sent[-1] = sent[-1][:e.characters_written]
3090 received.append(rf.read())
3091 msg = b'BLOCKED'
3092 wf.write(msg)
3093 sent.append(msg)
3094
3095 while True:
3096 try:
3097 wf.flush()
3098 break
3099 except self.BlockingIOError as e:
3100 self.assertEqual(e.args[0], errno.EAGAIN)
3101 self.assertEqual(e.characters_written, 0)
3102 received.append(rf.read())
3103
3104 received += iter(rf.read, None)
3105
3106 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03003107 self.assertEqual(sent, received)
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01003108 self.assertTrue(wf.closed)
3109 self.assertTrue(rf.closed)
3110
Antoine Pitrou19690592009-06-12 20:14:08 +00003111class CMiscIOTest(MiscIOTest):
3112 io = io
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03003113 shutdown_error = "RuntimeError: could not find io module state"
Antoine Pitrou19690592009-06-12 20:14:08 +00003114
3115class PyMiscIOTest(MiscIOTest):
3116 io = pyio
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03003117 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Petersonad100c32008-11-20 22:06:22 +00003118
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003119
3120@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3121class SignalsTest(unittest.TestCase):
3122
3123 def setUp(self):
3124 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3125
3126 def tearDown(self):
3127 signal.signal(signal.SIGALRM, self.oldalrm)
3128
3129 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00003130 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003131
3132 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinner49d495f2011-07-04 11:44:46 +02003133 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
3134 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003135 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3136 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou6439c002011-02-25 21:35:47 +00003137 invokes the signal handler, and bubbles up the exception raised
3138 in the latter."""
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003139 read_results = []
3140 def _read():
3141 s = os.read(r, 1)
3142 read_results.append(s)
3143 t = threading.Thread(target=_read)
3144 t.daemon = True
3145 r, w = os.pipe()
3146 try:
3147 wio = self.io.open(w, **fdopen_kwargs)
3148 t.start()
3149 signal.alarm(1)
3150 # Fill the pipe enough that the write will be blocking.
3151 # It will be interrupted by the timer armed above. Since the
3152 # other thread has read one byte, the low-level write will
3153 # return with a successful (partial) result rather than an EINTR.
3154 # The buffered IO layer must check for pending signal
3155 # handlers, which in this case will invoke alarm_interrupt().
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03003156 try:
3157 with self.assertRaises(ZeroDivisionError):
3158 wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1))
3159 finally:
3160 t.join()
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003161 # We got one byte, get another one and check that it isn't a
3162 # repeat of the first one.
3163 read_results.append(os.read(r, 1))
3164 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3165 finally:
3166 os.close(w)
3167 os.close(r)
3168 # This is deliberate. If we didn't close the file descriptor
3169 # before closing wio, wio would try to flush its internal
3170 # buffer, and block again.
3171 try:
3172 wio.close()
3173 except IOError as e:
3174 if e.errno != errno.EBADF:
3175 raise
3176
3177 def test_interrupted_write_unbuffered(self):
3178 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3179
3180 def test_interrupted_write_buffered(self):
3181 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3182
3183 def test_interrupted_write_text(self):
3184 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3185
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003186 def check_reentrant_write(self, data, **fdopen_kwargs):
3187 def on_alarm(*args):
3188 # Will be called reentrantly from the same thread
3189 wio.write(data)
Victor Stinner4c41f842011-07-05 13:29:26 +02003190 1//0
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003191 signal.signal(signal.SIGALRM, on_alarm)
3192 r, w = os.pipe()
3193 wio = self.io.open(w, **fdopen_kwargs)
3194 try:
3195 signal.alarm(1)
3196 # Either the reentrant call to wio.write() fails with RuntimeError,
3197 # or the signal handler raises ZeroDivisionError.
3198 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3199 while 1:
3200 for i in range(100):
3201 wio.write(data)
3202 wio.flush()
3203 # Make sure the buffer doesn't fill up and block further writes
3204 os.read(r, len(data) * 100)
3205 exc = cm.exception
3206 if isinstance(exc, RuntimeError):
3207 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3208 finally:
3209 wio.close()
3210 os.close(r)
3211
3212 def test_reentrant_write_buffered(self):
3213 self.check_reentrant_write(b"xy", mode="wb")
3214
3215 def test_reentrant_write_text(self):
3216 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3217
Antoine Pitrou6439c002011-02-25 21:35:47 +00003218 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3219 """Check that a buffered read, when it gets interrupted (either
3220 returning a partial result or EINTR), properly invokes the signal
3221 handler and retries if the latter returned successfully."""
3222 r, w = os.pipe()
3223 fdopen_kwargs["closefd"] = False
3224 def alarm_handler(sig, frame):
3225 os.write(w, b"bar")
3226 signal.signal(signal.SIGALRM, alarm_handler)
3227 try:
3228 rio = self.io.open(r, **fdopen_kwargs)
3229 os.write(w, b"foo")
3230 signal.alarm(1)
3231 # Expected behaviour:
3232 # - first raw read() returns partial b"foo"
3233 # - second raw read() returns EINTR
3234 # - third raw read() returns b"bar"
3235 self.assertEqual(decode(rio.read(6)), "foobar")
3236 finally:
3237 rio.close()
3238 os.close(w)
3239 os.close(r)
3240
3241 def test_interrupterd_read_retry_buffered(self):
3242 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3243 mode="rb")
3244
3245 def test_interrupterd_read_retry_text(self):
3246 self.check_interrupted_read_retry(lambda x: x,
3247 mode="r")
3248
3249 @unittest.skipUnless(threading, 'Threading required for this test.')
3250 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3251 """Check that a buffered write, when it gets interrupted (either
3252 returning a partial result or EINTR), properly invokes the signal
3253 handler and retries if the latter returned successfully."""
3254 select = support.import_module("select")
3255 # A quantity that exceeds the buffer size of an anonymous pipe's
3256 # write end.
Antoine Pitrou68915d72013-04-24 23:31:38 +02003257 N = support.PIPE_MAX_SIZE
Antoine Pitrou6439c002011-02-25 21:35:47 +00003258 r, w = os.pipe()
3259 fdopen_kwargs["closefd"] = False
3260 # We need a separate thread to read from the pipe and allow the
3261 # write() to finish. This thread is started after the SIGALRM is
3262 # received (forcing a first EINTR in write()).
3263 read_results = []
3264 write_finished = False
Serhiy Storchaka53ea1622015-03-28 20:38:48 +02003265 error = [None]
Antoine Pitrou6439c002011-02-25 21:35:47 +00003266 def _read():
Serhiy Storchaka53ea1622015-03-28 20:38:48 +02003267 try:
3268 while not write_finished:
3269 while r in select.select([r], [], [], 1.0)[0]:
3270 s = os.read(r, 1024)
3271 read_results.append(s)
3272 except BaseException as exc:
3273 error[0] = exc
Antoine Pitrou6439c002011-02-25 21:35:47 +00003274 t = threading.Thread(target=_read)
3275 t.daemon = True
3276 def alarm1(sig, frame):
3277 signal.signal(signal.SIGALRM, alarm2)
3278 signal.alarm(1)
3279 def alarm2(sig, frame):
3280 t.start()
3281 signal.signal(signal.SIGALRM, alarm1)
3282 try:
3283 wio = self.io.open(w, **fdopen_kwargs)
3284 signal.alarm(1)
3285 # Expected behaviour:
3286 # - first raw write() is partial (because of the limited pipe buffer
3287 # and the first alarm)
3288 # - second raw write() returns EINTR (because of the second alarm)
3289 # - subsequent write()s are successful (either partial or complete)
3290 self.assertEqual(N, wio.write(item * N))
3291 wio.flush()
3292 write_finished = True
3293 t.join()
Serhiy Storchaka53ea1622015-03-28 20:38:48 +02003294
3295 self.assertIsNone(error[0])
Antoine Pitrou6439c002011-02-25 21:35:47 +00003296 self.assertEqual(N, sum(len(x) for x in read_results))
3297 finally:
3298 write_finished = True
3299 os.close(w)
3300 os.close(r)
3301 # This is deliberate. If we didn't close the file descriptor
3302 # before closing wio, wio would try to flush its internal
3303 # buffer, and could block (in case of failure).
3304 try:
3305 wio.close()
3306 except IOError as e:
3307 if e.errno != errno.EBADF:
3308 raise
3309
3310 def test_interrupterd_write_retry_buffered(self):
3311 self.check_interrupted_write_retry(b"x", mode="wb")
3312
3313 def test_interrupterd_write_retry_text(self):
3314 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3315
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003316
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003317class CSignalsTest(SignalsTest):
3318 io = io
3319
3320class PySignalsTest(SignalsTest):
3321 io = pyio
3322
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003323 # Handling reentrancy issues would slow down _pyio even more, so the
3324 # tests are disabled.
3325 test_reentrant_write_buffered = None
3326 test_reentrant_write_text = None
3327
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003328
Christian Heimes1a6387e2008-03-26 12:49:49 +00003329def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00003330 tests = (CIOTest, PyIOTest,
3331 CBufferedReaderTest, PyBufferedReaderTest,
3332 CBufferedWriterTest, PyBufferedWriterTest,
3333 CBufferedRWPairTest, PyBufferedRWPairTest,
3334 CBufferedRandomTest, PyBufferedRandomTest,
3335 StatefulIncrementalDecoderTest,
3336 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3337 CTextIOWrapperTest, PyTextIOWrapperTest,
3338 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003339 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00003340 )
3341
3342 # Put the namespaces of the IO module we are testing and some useful mock
3343 # classes in the __dict__ of each test.
3344 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00003345 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00003346 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3347 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
3348 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
3349 globs = globals()
3350 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3351 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3352 # Avoid turning open into a bound method.
3353 py_io_ns["open"] = pyio.OpenWrapper
3354 for test in tests:
3355 if test.__name__.startswith("C"):
3356 for name, obj in c_io_ns.items():
3357 setattr(test, name, obj)
3358 elif test.__name__.startswith("Py"):
3359 for name, obj in py_io_ns.items():
3360 setattr(test, name, obj)
3361
3362 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00003363
3364if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00003365 test_main()