blob: 3d4da46113927bab843592f951e2762b08dae960 [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakac72e66a2015-11-02 15:06:09 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Antoine Pitrou19690592009-06-12 20:14:08 +000019# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Serhiy Storchaka05b0a1b2014-06-09 13:32:08 +030032import warnings
Antoine Pitrou19690592009-06-12 20:14:08 +000033import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000034import signal
35import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000036from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000037from collections import deque
Antoine Pitrou78e761e2012-10-16 22:57:11 +020038from UserList import UserList
Antoine Pitrou19690592009-06-12 20:14:08 +000039from test import test_support as support
Serhiy Storchaka354d50e2013-02-03 17:10:42 +020040import contextlib
Christian Heimes1a6387e2008-03-26 12:49:49 +000041
42import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000043import io # C implementation of io
44import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000045try:
46 import threading
47except ImportError:
48 threading = None
Antoine Pitrou5aa7df32011-11-21 20:16:44 +010049try:
50 import fcntl
51except ImportError:
52 fcntl = None
Antoine Pitrou19690592009-06-12 20:14:08 +000053
54__metaclass__ = type
55bytes = support.py3k_bytes
56
Martin Panterc9813d82016-06-03 05:59:20 +000057def byteslike(*pos, **kw):
58 return memoryview(bytearray(*pos, **kw))
59
Antoine Pitrou19690592009-06-12 20:14:08 +000060def _default_chunk_size():
61 """Get the default TextIOWrapper chunk size"""
62 with io.open(__file__, "r", encoding="latin1") as f:
63 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000064
65
Antoine Pitrou6391b342010-09-14 18:48:19 +000066class MockRawIOWithoutRead:
67 """A RawIO implementation without read(), so as to exercise the default
68 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000069
70 def __init__(self, read_stack=()):
71 self._read_stack = list(read_stack)
72 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000073 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000074 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000075
Christian Heimes1a6387e2008-03-26 12:49:49 +000076 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000077 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000078 return len(b)
79
80 def writable(self):
81 return True
82
83 def fileno(self):
84 return 42
85
86 def readable(self):
87 return True
88
89 def seekable(self):
90 return True
91
92 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000093 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000094
95 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000096 return 0 # same comment as above
97
98 def readinto(self, buf):
99 self._reads += 1
100 max_len = len(buf)
101 try:
102 data = self._read_stack[0]
103 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000104 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +0000105 return 0
106 if data is None:
107 del self._read_stack[0]
108 return None
109 n = len(data)
110 if len(data) <= max_len:
111 del self._read_stack[0]
112 buf[:n] = data
113 return n
114 else:
115 buf[:] = data[:max_len]
116 self._read_stack[0] = data[max_len:]
117 return max_len
118
119 def truncate(self, pos=None):
120 return pos
121
Antoine Pitrou6391b342010-09-14 18:48:19 +0000122class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
123 pass
124
125class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
126 pass
127
128
129class MockRawIO(MockRawIOWithoutRead):
130
131 def read(self, n=None):
132 self._reads += 1
133 try:
134 return self._read_stack.pop(0)
135 except:
136 self._extraneous_reads += 1
137 return b""
138
Antoine Pitrou19690592009-06-12 20:14:08 +0000139class CMockRawIO(MockRawIO, io.RawIOBase):
140 pass
141
142class PyMockRawIO(MockRawIO, pyio.RawIOBase):
143 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000144
145
Antoine Pitrou19690592009-06-12 20:14:08 +0000146class MisbehavedRawIO(MockRawIO):
147 def write(self, b):
148 return MockRawIO.write(self, b) * 2
149
150 def read(self, n=None):
151 return MockRawIO.read(self, n) * 2
152
153 def seek(self, pos, whence):
154 return -123
155
156 def tell(self):
157 return -456
158
159 def readinto(self, buf):
160 MockRawIO.readinto(self, buf)
161 return len(buf) * 5
162
163class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
164 pass
165
166class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
167 pass
168
169
170class CloseFailureIO(MockRawIO):
171 closed = 0
172
173 def close(self):
174 if not self.closed:
175 self.closed = 1
176 raise IOError
177
178class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
179 pass
180
181class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
182 pass
183
184
185class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000186
187 def __init__(self, data):
188 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000189 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000190
191 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000192 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000193 self.read_history.append(None if res is None else len(res))
194 return res
195
Antoine Pitrou19690592009-06-12 20:14:08 +0000196 def readinto(self, b):
197 res = super(MockFileIO, self).readinto(b)
198 self.read_history.append(res)
199 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000200
Antoine Pitrou19690592009-06-12 20:14:08 +0000201class CMockFileIO(MockFileIO, io.BytesIO):
202 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000203
Antoine Pitrou19690592009-06-12 20:14:08 +0000204class PyMockFileIO(MockFileIO, pyio.BytesIO):
205 pass
206
207
208class MockNonBlockWriterIO:
209
210 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000211 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000212 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000213
Antoine Pitrou19690592009-06-12 20:14:08 +0000214 def pop_written(self):
215 s = b"".join(self._write_stack)
216 self._write_stack[:] = []
217 return s
218
219 def block_on(self, char):
220 """Block when a given char is encountered."""
221 self._blocker_char = char
222
223 def readable(self):
224 return True
225
226 def seekable(self):
227 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000228
229 def writable(self):
230 return True
231
Antoine Pitrou19690592009-06-12 20:14:08 +0000232 def write(self, b):
233 b = bytes(b)
234 n = -1
235 if self._blocker_char:
236 try:
237 n = b.index(self._blocker_char)
238 except ValueError:
239 pass
240 else:
Antoine Pitrou5aa7df32011-11-21 20:16:44 +0100241 if n > 0:
242 # write data up to the first blocker
243 self._write_stack.append(b[:n])
244 return n
245 else:
246 # cancel blocker and indicate would block
247 self._blocker_char = None
248 return None
Antoine Pitrou19690592009-06-12 20:14:08 +0000249 self._write_stack.append(b)
250 return len(b)
251
252class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
253 BlockingIOError = io.BlockingIOError
254
255class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
256 BlockingIOError = pyio.BlockingIOError
257
Christian Heimes1a6387e2008-03-26 12:49:49 +0000258
259class IOTest(unittest.TestCase):
260
Antoine Pitrou19690592009-06-12 20:14:08 +0000261 def setUp(self):
262 support.unlink(support.TESTFN)
263
Christian Heimes1a6387e2008-03-26 12:49:49 +0000264 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000265 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000266
267 def write_ops(self, f):
268 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000269 f.truncate(0)
270 self.assertEqual(f.tell(), 5)
271 f.seek(0)
272
273 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000274 self.assertEqual(f.seek(0), 0)
275 self.assertEqual(f.write(b"Hello."), 6)
276 self.assertEqual(f.tell(), 6)
277 self.assertEqual(f.seek(-1, 1), 5)
278 self.assertEqual(f.tell(), 5)
Martin Panterc9813d82016-06-03 05:59:20 +0000279 buffer = bytearray(b" world\n\n\n")
280 self.assertEqual(f.write(buffer), 9)
281 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Christian Heimes1a6387e2008-03-26 12:49:49 +0000282 self.assertEqual(f.seek(0), 0)
283 self.assertEqual(f.write(b"h"), 1)
284 self.assertEqual(f.seek(-1, 2), 13)
285 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000286
Christian Heimes1a6387e2008-03-26 12:49:49 +0000287 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000288 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000289 self.assertRaises(TypeError, f.seek, 0.0)
290
291 def read_ops(self, f, buffered=False):
292 data = f.read(5)
293 self.assertEqual(data, b"hello")
Martin Panterc9813d82016-06-03 05:59:20 +0000294 data = byteslike(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000295 self.assertEqual(f.readinto(data), 5)
Martin Panterc9813d82016-06-03 05:59:20 +0000296 self.assertEqual(data.tobytes(), b" worl")
297 data = bytearray(5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000298 self.assertEqual(f.readinto(data), 2)
299 self.assertEqual(len(data), 5)
300 self.assertEqual(data[:2], b"d\n")
301 self.assertEqual(f.seek(0), 0)
302 self.assertEqual(f.read(20), b"hello world\n")
303 self.assertEqual(f.read(1), b"")
Martin Panterc9813d82016-06-03 05:59:20 +0000304 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000305 self.assertEqual(f.seek(-6, 2), 6)
306 self.assertEqual(f.read(5), b"world")
307 self.assertEqual(f.read(0), b"")
Martin Panterc9813d82016-06-03 05:59:20 +0000308 self.assertEqual(f.readinto(byteslike()), 0)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000309 self.assertEqual(f.seek(-6, 1), 5)
310 self.assertEqual(f.read(5), b" worl")
311 self.assertEqual(f.tell(), 10)
312 self.assertRaises(TypeError, f.seek, 0.0)
313 if buffered:
314 f.seek(0)
315 self.assertEqual(f.read(), b"hello world\n")
316 f.seek(6)
317 self.assertEqual(f.read(), b"world\n")
318 self.assertEqual(f.read(), b"")
319
320 LARGE = 2**31
321
322 def large_file_ops(self, f):
323 assert f.readable()
324 assert f.writable()
325 self.assertEqual(f.seek(self.LARGE), self.LARGE)
326 self.assertEqual(f.tell(), self.LARGE)
327 self.assertEqual(f.write(b"xxx"), 3)
328 self.assertEqual(f.tell(), self.LARGE + 3)
329 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
330 self.assertEqual(f.truncate(), self.LARGE + 2)
331 self.assertEqual(f.tell(), self.LARGE + 2)
332 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
333 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000334 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000335 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
336 self.assertEqual(f.seek(-1, 2), self.LARGE)
337 self.assertEqual(f.read(2), b"x")
338
Antoine Pitrou19690592009-06-12 20:14:08 +0000339 def test_invalid_operations(self):
340 # Try writing on a file opened in read mode and vice-versa.
341 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000342 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000343 self.assertRaises(IOError, fp.read)
344 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000345 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000346 self.assertRaises(IOError, fp.write, b"blah")
347 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000348 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000349 self.assertRaises(IOError, fp.write, "blah")
350 self.assertRaises(IOError, fp.writelines, ["blah\n"])
351
Serhiy Storchaka3c9ce742016-07-01 23:34:44 +0300352 def test_open_handles_NUL_chars(self):
353 fn_with_NUL = 'foo\0bar'
354 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
355
356 bytes_fn = fn_with_NUL.encode('ascii')
357 with warnings.catch_warnings():
358 warnings.simplefilter("ignore", DeprecationWarning)
359 self.assertRaises(TypeError, self.open, bytes_fn, 'w')
360
Christian Heimes1a6387e2008-03-26 12:49:49 +0000361 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000362 with self.open(support.TESTFN, "wb", buffering=0) as f:
363 self.assertEqual(f.readable(), False)
364 self.assertEqual(f.writable(), True)
365 self.assertEqual(f.seekable(), True)
366 self.write_ops(f)
367 with self.open(support.TESTFN, "rb", buffering=0) as f:
368 self.assertEqual(f.readable(), True)
369 self.assertEqual(f.writable(), False)
370 self.assertEqual(f.seekable(), True)
371 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000372
373 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000374 with self.open(support.TESTFN, "wb") as f:
375 self.assertEqual(f.readable(), False)
376 self.assertEqual(f.writable(), True)
377 self.assertEqual(f.seekable(), True)
378 self.write_ops(f)
379 with self.open(support.TESTFN, "rb") as f:
380 self.assertEqual(f.readable(), True)
381 self.assertEqual(f.writable(), False)
382 self.assertEqual(f.seekable(), True)
383 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000384
385 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000386 with self.open(support.TESTFN, "wb") as f:
387 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
388 with self.open(support.TESTFN, "rb") as f:
389 self.assertEqual(f.readline(), b"abc\n")
390 self.assertEqual(f.readline(10), b"def\n")
391 self.assertEqual(f.readline(2), b"xy")
392 self.assertEqual(f.readline(4), b"zzy\n")
393 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000394 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000395 self.assertRaises(TypeError, f.readline, 5.3)
396 with self.open(support.TESTFN, "r") as f:
397 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000398
399 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000400 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000401 self.write_ops(f)
402 data = f.getvalue()
403 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000404 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000405 self.read_ops(f, True)
406
407 def test_large_file_ops(self):
408 # On Windows and Mac OSX this test comsumes large resources; It takes
409 # a long time to build the >2GB file and takes >2GB of disk space
410 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000411 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware1f702212013-12-10 14:09:20 -0600412 support.requires(
413 'largefile',
414 'test requires %s bytes and a long time to run' % self.LARGE)
Antoine Pitrou19690592009-06-12 20:14:08 +0000415 with self.open(support.TESTFN, "w+b", 0) as f:
416 self.large_file_ops(f)
417 with self.open(support.TESTFN, "w+b") as f:
418 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000419
420 def test_with_open(self):
421 for bufsize in (0, 1, 100):
422 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000423 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000424 f.write(b"xxx")
425 self.assertEqual(f.closed, True)
426 f = None
427 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000428 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000429 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000430 except ZeroDivisionError:
431 self.assertEqual(f.closed, True)
432 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000433 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000434
Antoine Pitroue741cc62009-01-21 00:45:36 +0000435 # issue 5008
436 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000437 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000438 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000439 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000440 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000441 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000442 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000443 with self.open(support.TESTFN, "a") as f:
Serhiy Storchakaea4d2872015-08-02 15:19:04 +0300444 self.assertGreater(f.tell(), 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000445
Christian Heimes1a6387e2008-03-26 12:49:49 +0000446 def test_destructor(self):
447 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000448 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000449 def __del__(self):
450 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000451 try:
452 f = super(MyFileIO, self).__del__
453 except AttributeError:
454 pass
455 else:
456 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000457 def close(self):
458 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000459 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000460 def flush(self):
461 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000462 super(MyFileIO, self).flush()
463 f = MyFileIO(support.TESTFN, "wb")
464 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000465 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000466 support.gc_collect()
467 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000468 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000469 self.assertEqual(f.read(), b"xxx")
470
471 def _check_base_destructor(self, base):
472 record = []
473 class MyIO(base):
474 def __init__(self):
475 # This exercises the availability of attributes on object
476 # destruction.
477 # (in the C version, close() is called by the tp_dealloc
478 # function, not by __del__)
479 self.on_del = 1
480 self.on_close = 2
481 self.on_flush = 3
482 def __del__(self):
483 record.append(self.on_del)
484 try:
485 f = super(MyIO, self).__del__
486 except AttributeError:
487 pass
488 else:
489 f()
490 def close(self):
491 record.append(self.on_close)
492 super(MyIO, self).close()
493 def flush(self):
494 record.append(self.on_flush)
495 super(MyIO, self).flush()
496 f = MyIO()
497 del f
498 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000499 self.assertEqual(record, [1, 2, 3])
500
Antoine Pitrou19690592009-06-12 20:14:08 +0000501 def test_IOBase_destructor(self):
502 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000503
Antoine Pitrou19690592009-06-12 20:14:08 +0000504 def test_RawIOBase_destructor(self):
505 self._check_base_destructor(self.RawIOBase)
506
507 def test_BufferedIOBase_destructor(self):
508 self._check_base_destructor(self.BufferedIOBase)
509
510 def test_TextIOBase_destructor(self):
511 self._check_base_destructor(self.TextIOBase)
512
513 def test_close_flushes(self):
514 with self.open(support.TESTFN, "wb") as f:
515 f.write(b"xxx")
516 with self.open(support.TESTFN, "rb") as f:
517 self.assertEqual(f.read(), b"xxx")
518
519 def test_array_writes(self):
520 a = array.array(b'i', range(10))
521 n = len(a.tostring())
522 with self.open(support.TESTFN, "wb", 0) as f:
523 self.assertEqual(f.write(a), n)
524 with self.open(support.TESTFN, "wb") as f:
525 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000526
527 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000528 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000529 closefd=False)
530
Antoine Pitrou19690592009-06-12 20:14:08 +0000531 def test_read_closed(self):
532 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000533 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000534 with self.open(support.TESTFN, "r") as f:
535 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000536 self.assertEqual(file.read(), "egg\n")
537 file.seek(0)
538 file.close()
539 self.assertRaises(ValueError, file.read)
540
541 def test_no_closefd_with_filename(self):
542 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000543 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000544
545 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000546 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000547 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000548 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000549 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000550 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000551 self.assertEqual(file.buffer.raw.closefd, False)
552
Antoine Pitrou19690592009-06-12 20:14:08 +0000553 def test_garbage_collection(self):
554 # FileIO objects are collected, and collecting them flushes
555 # all data to disk.
556 f = self.FileIO(support.TESTFN, "wb")
557 f.write(b"abcxxx")
558 f.f = f
559 wr = weakref.ref(f)
560 del f
561 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +0300562 self.assertIsNone(wr(), wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000563 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000564 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000565
Antoine Pitrou19690592009-06-12 20:14:08 +0000566 def test_unbounded_file(self):
567 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
568 zero = "/dev/zero"
569 if not os.path.exists(zero):
570 self.skipTest("{0} does not exist".format(zero))
571 if sys.maxsize > 0x7FFFFFFF:
572 self.skipTest("test can only run in a 32-bit address space")
573 if support.real_max_memuse < support._2G:
574 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000575 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000576 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000577 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000578 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000579 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000580 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000581
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200582 def check_flush_error_on_close(self, *args, **kwargs):
583 # Test that the file is closed despite failed flush
584 # and that flush() is called before file closed.
585 f = self.open(*args, **kwargs)
586 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000587 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200588 closed[:] = [f.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000589 raise IOError()
590 f.flush = bad_flush
591 self.assertRaises(IOError, f.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600592 self.assertTrue(f.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200593 self.assertTrue(closed) # flush() called
594 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchaka437d5352015-02-23 00:28:38 +0200595 f.flush = lambda: None # break reference loop
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200596
597 def test_flush_error_on_close(self):
598 # raw file
599 # Issue #5700: io.FileIO calls flush() after file closed
600 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
601 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
602 self.check_flush_error_on_close(fd, 'wb', buffering=0)
603 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
604 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
605 os.close(fd)
606 # buffered io
607 self.check_flush_error_on_close(support.TESTFN, 'wb')
608 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
609 self.check_flush_error_on_close(fd, 'wb')
610 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
611 self.check_flush_error_on_close(fd, 'wb', closefd=False)
612 os.close(fd)
613 # text io
614 self.check_flush_error_on_close(support.TESTFN, 'w')
615 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
616 self.check_flush_error_on_close(fd, 'w')
617 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
618 self.check_flush_error_on_close(fd, 'w', closefd=False)
619 os.close(fd)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000620
621 def test_multi_close(self):
622 f = self.open(support.TESTFN, "wb", buffering=0)
623 f.close()
624 f.close()
625 f.close()
626 self.assertRaises(ValueError, f.flush)
627
Antoine Pitrou6391b342010-09-14 18:48:19 +0000628 def test_RawIOBase_read(self):
629 # Exercise the default RawIOBase.read() implementation (which calls
630 # readinto() internally).
631 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
632 self.assertEqual(rawio.read(2), b"ab")
633 self.assertEqual(rawio.read(2), b"c")
634 self.assertEqual(rawio.read(2), b"d")
635 self.assertEqual(rawio.read(2), None)
636 self.assertEqual(rawio.read(2), b"ef")
637 self.assertEqual(rawio.read(2), b"g")
638 self.assertEqual(rawio.read(2), None)
639 self.assertEqual(rawio.read(2), b"")
640
Hynek Schlawack877effc2012-05-25 09:24:18 +0200641 def test_fileio_closefd(self):
642 # Issue #4841
643 with self.open(__file__, 'rb') as f1, \
644 self.open(__file__, 'rb') as f2:
645 fileio = self.FileIO(f1.fileno(), closefd=False)
646 # .__init__() must not close f1
647 fileio.__init__(f2.fileno(), closefd=False)
648 f1.readline()
649 # .close() must not close f2
650 fileio.close()
651 f2.readline()
652
Serhiy Storchaka05b0a1b2014-06-09 13:32:08 +0300653 def test_nonbuffered_textio(self):
654 with warnings.catch_warnings(record=True) as recorded:
655 with self.assertRaises(ValueError):
656 self.open(support.TESTFN, 'w', buffering=0)
657 support.gc_collect()
658 self.assertEqual(recorded, [])
659
660 def test_invalid_newline(self):
661 with warnings.catch_warnings(record=True) as recorded:
662 with self.assertRaises(ValueError):
663 self.open(support.TESTFN, 'w', newline='invalid')
664 support.gc_collect()
665 self.assertEqual(recorded, [])
666
Martin Panterc9813d82016-06-03 05:59:20 +0000667 def test_buffered_readinto_mixin(self):
668 # Test the implementation provided by BufferedIOBase
669 class Stream(self.BufferedIOBase):
670 def read(self, size):
671 return b"12345"
672 stream = Stream()
673 buffer = byteslike(5)
674 self.assertEqual(stream.readinto(buffer), 5)
675 self.assertEqual(buffer.tobytes(), b"12345")
676
Hynek Schlawack877effc2012-05-25 09:24:18 +0200677
Antoine Pitrou19690592009-06-12 20:14:08 +0000678class CIOTest(IOTest):
Antoine Pitrou16166452011-07-12 22:04:20 +0200679
680 def test_IOBase_finalize(self):
681 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
682 # class which inherits IOBase and an object of this class are caught
683 # in a reference cycle and close() is already in the method cache.
684 class MyIO(self.IOBase):
685 def close(self):
686 pass
687
688 # create an instance to populate the method cache
689 MyIO()
690 obj = MyIO()
691 obj.obj = obj
692 wr = weakref.ref(obj)
693 del MyIO
694 del obj
695 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +0300696 self.assertIsNone(wr(), wr)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000697
Antoine Pitrou19690592009-06-12 20:14:08 +0000698class PyIOTest(IOTest):
699 test_array_writes = unittest.skip(
700 "len(array.array) returns number of elements rather than bytelength"
701 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000702
703
Antoine Pitrou19690592009-06-12 20:14:08 +0000704class CommonBufferedTests:
705 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
706
707 def test_detach(self):
708 raw = self.MockRawIO()
709 buf = self.tp(raw)
710 self.assertIs(buf.detach(), raw)
711 self.assertRaises(ValueError, buf.detach)
712
Benjamin Peterson53ae6142014-12-21 20:51:50 -0600713 repr(buf) # Should still work
714
Antoine Pitrou19690592009-06-12 20:14:08 +0000715 def test_fileno(self):
716 rawio = self.MockRawIO()
717 bufio = self.tp(rawio)
718
Ezio Melotti2623a372010-11-21 13:34:58 +0000719 self.assertEqual(42, bufio.fileno())
Antoine Pitrou19690592009-06-12 20:14:08 +0000720
Antoine Pitrou19690592009-06-12 20:14:08 +0000721 def test_invalid_args(self):
722 rawio = self.MockRawIO()
723 bufio = self.tp(rawio)
724 # Invalid whence
725 self.assertRaises(ValueError, bufio.seek, 0, -1)
726 self.assertRaises(ValueError, bufio.seek, 0, 3)
727
728 def test_override_destructor(self):
729 tp = self.tp
730 record = []
731 class MyBufferedIO(tp):
732 def __del__(self):
733 record.append(1)
734 try:
735 f = super(MyBufferedIO, self).__del__
736 except AttributeError:
737 pass
738 else:
739 f()
740 def close(self):
741 record.append(2)
742 super(MyBufferedIO, self).close()
743 def flush(self):
744 record.append(3)
745 super(MyBufferedIO, self).flush()
746 rawio = self.MockRawIO()
747 bufio = MyBufferedIO(rawio)
748 writable = bufio.writable()
749 del bufio
750 support.gc_collect()
751 if writable:
752 self.assertEqual(record, [1, 2, 3])
753 else:
754 self.assertEqual(record, [1, 2])
755
756 def test_context_manager(self):
757 # Test usability as a context manager
758 rawio = self.MockRawIO()
759 bufio = self.tp(rawio)
760 def _with():
761 with bufio:
762 pass
763 _with()
764 # bufio should now be closed, and using it a second time should raise
765 # a ValueError.
766 self.assertRaises(ValueError, _with)
767
768 def test_error_through_destructor(self):
769 # Test that the exception state is not modified by a destructor,
770 # even if close() fails.
771 rawio = self.CloseFailureIO()
772 def f():
773 self.tp(rawio).xyzzy
774 with support.captured_output("stderr") as s:
775 self.assertRaises(AttributeError, f)
776 s = s.getvalue().strip()
777 if s:
778 # The destructor *may* have printed an unraisable error, check it
779 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000780 self.assertTrue(s.startswith("Exception IOError: "), s)
781 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000782
783 def test_repr(self):
784 raw = self.MockRawIO()
785 b = self.tp(raw)
786 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
787 self.assertEqual(repr(b), "<%s>" % clsname)
788 raw.name = "dummy"
789 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
790 raw.name = b"dummy"
791 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000792
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000793 def test_flush_error_on_close(self):
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200794 # Test that buffered file is closed despite failed flush
795 # and that flush() is called before file closed.
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000796 raw = self.MockRawIO()
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200797 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000798 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200799 closed[:] = [b.closed, raw.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000800 raise IOError()
801 raw.flush = bad_flush
802 b = self.tp(raw)
803 self.assertRaises(IOError, b.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600804 self.assertTrue(b.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200805 self.assertTrue(raw.closed)
806 self.assertTrue(closed) # flush() called
807 self.assertFalse(closed[0]) # flush() called before file closed
808 self.assertFalse(closed[1])
Serhiy Storchaka437d5352015-02-23 00:28:38 +0200809 raw.flush = lambda: None # break reference loop
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600810
811 def test_close_error_on_close(self):
812 raw = self.MockRawIO()
813 def bad_flush():
814 raise IOError('flush')
815 def bad_close():
816 raise IOError('close')
817 raw.close = bad_close
818 b = self.tp(raw)
819 b.flush = bad_flush
820 with self.assertRaises(IOError) as err: # exception not swallowed
821 b.close()
822 self.assertEqual(err.exception.args, ('close',))
823 self.assertFalse(b.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000824
825 def test_multi_close(self):
826 raw = self.MockRawIO()
827 b = self.tp(raw)
828 b.close()
829 b.close()
830 b.close()
831 self.assertRaises(ValueError, b.flush)
832
Antoine Pitroufc9ead62010-12-21 21:26:55 +0000833 def test_readonly_attributes(self):
834 raw = self.MockRawIO()
835 buf = self.tp(raw)
836 x = self.MockRawIO()
837 with self.assertRaises((AttributeError, TypeError)):
838 buf.raw = x
839
Christian Heimes1a6387e2008-03-26 12:49:49 +0000840
Antoine Pitroubff5df02012-07-29 19:02:46 +0200841class SizeofTest:
842
843 @support.cpython_only
844 def test_sizeof(self):
845 bufsize1 = 4096
846 bufsize2 = 8192
847 rawio = self.MockRawIO()
848 bufio = self.tp(rawio, buffer_size=bufsize1)
849 size = sys.getsizeof(bufio) - bufsize1
850 rawio = self.MockRawIO()
851 bufio = self.tp(rawio, buffer_size=bufsize2)
852 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
853
854
Antoine Pitrou19690592009-06-12 20:14:08 +0000855class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
856 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000857
Antoine Pitrou19690592009-06-12 20:14:08 +0000858 def test_constructor(self):
859 rawio = self.MockRawIO([b"abc"])
860 bufio = self.tp(rawio)
861 bufio.__init__(rawio)
862 bufio.__init__(rawio, buffer_size=1024)
863 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000864 self.assertEqual(b"abc", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000865 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
866 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
867 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
868 rawio = self.MockRawIO([b"abc"])
869 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000870 self.assertEqual(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000871
Serhiy Storchaka1d19f972014-02-12 10:52:07 +0200872 def test_uninitialized(self):
873 bufio = self.tp.__new__(self.tp)
874 del bufio
875 bufio = self.tp.__new__(self.tp)
876 self.assertRaisesRegexp((ValueError, AttributeError),
877 'uninitialized|has no attribute',
878 bufio.read, 0)
879 bufio.__init__(self.MockRawIO())
880 self.assertEqual(bufio.read(0), b'')
881
Antoine Pitrou19690592009-06-12 20:14:08 +0000882 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000883 for arg in (None, 7):
884 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
885 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000886 self.assertEqual(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000887 # Invalid args
888 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000889
Antoine Pitrou19690592009-06-12 20:14:08 +0000890 def test_read1(self):
891 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
892 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000893 self.assertEqual(b"a", bufio.read(1))
894 self.assertEqual(b"b", bufio.read1(1))
895 self.assertEqual(rawio._reads, 1)
896 self.assertEqual(b"c", bufio.read1(100))
897 self.assertEqual(rawio._reads, 1)
898 self.assertEqual(b"d", bufio.read1(100))
899 self.assertEqual(rawio._reads, 2)
900 self.assertEqual(b"efg", bufio.read1(100))
901 self.assertEqual(rawio._reads, 3)
902 self.assertEqual(b"", bufio.read1(100))
903 self.assertEqual(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000904 # Invalid args
905 self.assertRaises(ValueError, bufio.read1, -1)
906
907 def test_readinto(self):
908 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
909 bufio = self.tp(rawio)
910 b = bytearray(2)
Ezio Melotti2623a372010-11-21 13:34:58 +0000911 self.assertEqual(bufio.readinto(b), 2)
912 self.assertEqual(b, b"ab")
913 self.assertEqual(bufio.readinto(b), 2)
914 self.assertEqual(b, b"cd")
915 self.assertEqual(bufio.readinto(b), 2)
916 self.assertEqual(b, b"ef")
917 self.assertEqual(bufio.readinto(b), 1)
918 self.assertEqual(b, b"gf")
919 self.assertEqual(bufio.readinto(b), 0)
920 self.assertEqual(b, b"gf")
Antoine Pitrou19690592009-06-12 20:14:08 +0000921
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000922 def test_readlines(self):
923 def bufio():
924 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
925 return self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000926 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
927 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
928 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000929
Antoine Pitrou19690592009-06-12 20:14:08 +0000930 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000931 data = b"abcdefghi"
932 dlen = len(data)
933
934 tests = [
935 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
936 [ 100, [ 3, 3, 3], [ dlen ] ],
937 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
938 ]
939
940 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000941 rawio = self.MockFileIO(data)
942 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000943 pos = 0
944 for nbytes in buf_read_sizes:
Ezio Melotti2623a372010-11-21 13:34:58 +0000945 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Christian Heimes1a6387e2008-03-26 12:49:49 +0000946 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000947 # this is mildly implementation-dependent
Ezio Melotti2623a372010-11-21 13:34:58 +0000948 self.assertEqual(rawio.read_history, raw_read_sizes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000949
Antoine Pitrou19690592009-06-12 20:14:08 +0000950 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000951 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000952 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
953 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000954 self.assertEqual(b"abcd", bufio.read(6))
955 self.assertEqual(b"e", bufio.read(1))
956 self.assertEqual(b"fg", bufio.read())
957 self.assertEqual(b"", bufio.peek(1))
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200958 self.assertIsNone(bufio.read())
Ezio Melotti2623a372010-11-21 13:34:58 +0000959 self.assertEqual(b"", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000960
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200961 rawio = self.MockRawIO((b"a", None, None))
962 self.assertEqual(b"a", rawio.readall())
963 self.assertIsNone(rawio.readall())
964
Antoine Pitrou19690592009-06-12 20:14:08 +0000965 def test_read_past_eof(self):
966 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
967 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000968
Ezio Melotti2623a372010-11-21 13:34:58 +0000969 self.assertEqual(b"abcdefg", bufio.read(9000))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000970
Antoine Pitrou19690592009-06-12 20:14:08 +0000971 def test_read_all(self):
972 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
973 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000974
Ezio Melotti2623a372010-11-21 13:34:58 +0000975 self.assertEqual(b"abcdefg", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000976
Victor Stinner6a102812010-04-27 23:55:59 +0000977 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +0000978 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +0000979 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000980 try:
981 # Write out many bytes with exactly the same number of 0's,
982 # 1's... 255's. This will help us check that concurrent reading
983 # doesn't duplicate or forget contents.
984 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000985 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000986 random.shuffle(l)
987 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000988 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000989 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000990 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000991 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000992 errors = []
993 results = []
994 def f():
995 try:
996 # Intra-buffer read then buffer-flushing read
997 for n in cycle([1, 19]):
998 s = bufio.read(n)
999 if not s:
1000 break
1001 # list.append() is atomic
1002 results.append(s)
1003 except Exception as e:
1004 errors.append(e)
1005 raise
1006 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03001007 with support.start_threads(threads):
1008 time.sleep(0.02) # yield
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001009 self.assertFalse(errors,
1010 "the following exceptions were caught: %r" % errors)
1011 s = b''.join(results)
1012 for i in range(256):
1013 c = bytes(bytearray([i]))
1014 self.assertEqual(s.count(c), N)
1015 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001016 support.unlink(support.TESTFN)
1017
1018 def test_misbehaved_io(self):
1019 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1020 bufio = self.tp(rawio)
1021 self.assertRaises(IOError, bufio.seek, 0)
1022 self.assertRaises(IOError, bufio.tell)
1023
Antoine Pitroucb4f47c2010-08-11 13:40:17 +00001024 def test_no_extraneous_read(self):
1025 # Issue #9550; when the raw IO object has satisfied the read request,
1026 # we should not issue any additional reads, otherwise it may block
1027 # (e.g. socket).
1028 bufsize = 16
1029 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1030 rawio = self.MockRawIO([b"x" * n])
1031 bufio = self.tp(rawio, bufsize)
1032 self.assertEqual(bufio.read(n), b"x" * n)
1033 # Simple case: one raw read is enough to satisfy the request.
1034 self.assertEqual(rawio._extraneous_reads, 0,
1035 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1036 # A more complex case where two raw reads are needed to satisfy
1037 # the request.
1038 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1039 bufio = self.tp(rawio, bufsize)
1040 self.assertEqual(bufio.read(n), b"x" * n)
1041 self.assertEqual(rawio._extraneous_reads, 0,
1042 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1043
1044
Antoine Pitroubff5df02012-07-29 19:02:46 +02001045class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001046 tp = io.BufferedReader
1047
1048 def test_constructor(self):
1049 BufferedReaderTest.test_constructor(self)
1050 # The allocation can succeed on 32-bit builds, e.g. with more
1051 # than 2GB RAM and a 64-bit kernel.
1052 if sys.maxsize > 0x7FFFFFFF:
1053 rawio = self.MockRawIO()
1054 bufio = self.tp(rawio)
1055 self.assertRaises((OverflowError, MemoryError, ValueError),
1056 bufio.__init__, rawio, sys.maxsize)
1057
1058 def test_initialization(self):
1059 rawio = self.MockRawIO([b"abc"])
1060 bufio = self.tp(rawio)
1061 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1062 self.assertRaises(ValueError, bufio.read)
1063 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1064 self.assertRaises(ValueError, bufio.read)
1065 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1066 self.assertRaises(ValueError, bufio.read)
1067
1068 def test_misbehaved_io_read(self):
1069 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1070 bufio = self.tp(rawio)
1071 # _pyio.BufferedReader seems to implement reading different, so that
1072 # checking this is not so easy.
1073 self.assertRaises(IOError, bufio.read, 10)
1074
1075 def test_garbage_collection(self):
1076 # C BufferedReader objects are collected.
1077 # The Python version has __del__, so it ends into gc.garbage instead
1078 rawio = self.FileIO(support.TESTFN, "w+b")
1079 f = self.tp(rawio)
1080 f.f = f
1081 wr = weakref.ref(f)
1082 del f
1083 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03001084 self.assertIsNone(wr(), wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00001085
R David Murray5b2cf5e2013-02-23 22:11:21 -05001086 def test_args_error(self):
1087 # Issue #17275
1088 with self.assertRaisesRegexp(TypeError, "BufferedReader"):
1089 self.tp(io.BytesIO(), 1024, 1024, 1024)
1090
1091
Antoine Pitrou19690592009-06-12 20:14:08 +00001092class PyBufferedReaderTest(BufferedReaderTest):
1093 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001094
1095
Antoine Pitrou19690592009-06-12 20:14:08 +00001096class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1097 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001098
Antoine Pitrou19690592009-06-12 20:14:08 +00001099 def test_constructor(self):
1100 rawio = self.MockRawIO()
1101 bufio = self.tp(rawio)
1102 bufio.__init__(rawio)
1103 bufio.__init__(rawio, buffer_size=1024)
1104 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +00001105 self.assertEqual(3, bufio.write(b"abc"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001106 bufio.flush()
1107 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1108 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1109 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1110 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +00001111 self.assertEqual(3, bufio.write(b"ghi"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001112 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001113 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001114
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001115 def test_uninitialized(self):
1116 bufio = self.tp.__new__(self.tp)
1117 del bufio
1118 bufio = self.tp.__new__(self.tp)
1119 self.assertRaisesRegexp((ValueError, AttributeError),
1120 'uninitialized|has no attribute',
1121 bufio.write, b'')
1122 bufio.__init__(self.MockRawIO())
1123 self.assertEqual(bufio.write(b''), 0)
1124
Antoine Pitrou19690592009-06-12 20:14:08 +00001125 def test_detach_flush(self):
1126 raw = self.MockRawIO()
1127 buf = self.tp(raw)
1128 buf.write(b"howdy!")
1129 self.assertFalse(raw._write_stack)
1130 buf.detach()
1131 self.assertEqual(raw._write_stack, [b"howdy!"])
1132
1133 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001134 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +00001135 writer = self.MockRawIO()
1136 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001137 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001138 self.assertFalse(writer._write_stack)
Martin Panterc9813d82016-06-03 05:59:20 +00001139 buffer = bytearray(b"def")
1140 bufio.write(buffer)
1141 buffer[:] = b"***" # Overwrite our copy of the data
1142 bufio.flush()
1143 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001144
Antoine Pitrou19690592009-06-12 20:14:08 +00001145 def test_write_overflow(self):
1146 writer = self.MockRawIO()
1147 bufio = self.tp(writer, 8)
1148 contents = b"abcdefghijklmnop"
1149 for n in range(0, len(contents), 3):
1150 bufio.write(contents[n:n+3])
1151 flushed = b"".join(writer._write_stack)
1152 # At least (total - 8) bytes were implicitly flushed, perhaps more
1153 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001154 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001155
Antoine Pitrou19690592009-06-12 20:14:08 +00001156 def check_writes(self, intermediate_func):
1157 # Lots of writes, test the flushed output is as expected.
1158 contents = bytes(range(256)) * 1000
1159 n = 0
1160 writer = self.MockRawIO()
1161 bufio = self.tp(writer, 13)
1162 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1163 def gen_sizes():
1164 for size in count(1):
1165 for i in range(15):
1166 yield size
1167 sizes = gen_sizes()
1168 while n < len(contents):
1169 size = min(next(sizes), len(contents) - n)
Ezio Melotti2623a372010-11-21 13:34:58 +00001170 self.assertEqual(bufio.write(contents[n:n+size]), size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001171 intermediate_func(bufio)
1172 n += size
1173 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001174 self.assertEqual(contents,
Antoine Pitrou19690592009-06-12 20:14:08 +00001175 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001176
Antoine Pitrou19690592009-06-12 20:14:08 +00001177 def test_writes(self):
1178 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001179
Antoine Pitrou19690592009-06-12 20:14:08 +00001180 def test_writes_and_flushes(self):
1181 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001182
Antoine Pitrou19690592009-06-12 20:14:08 +00001183 def test_writes_and_seeks(self):
1184 def _seekabs(bufio):
1185 pos = bufio.tell()
1186 bufio.seek(pos + 1, 0)
1187 bufio.seek(pos - 1, 0)
1188 bufio.seek(pos, 0)
1189 self.check_writes(_seekabs)
1190 def _seekrel(bufio):
1191 pos = bufio.seek(0, 1)
1192 bufio.seek(+1, 1)
1193 bufio.seek(-1, 1)
1194 bufio.seek(pos, 0)
1195 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001196
Antoine Pitrou19690592009-06-12 20:14:08 +00001197 def test_writes_and_truncates(self):
1198 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001199
Antoine Pitrou19690592009-06-12 20:14:08 +00001200 def test_write_non_blocking(self):
1201 raw = self.MockNonBlockWriterIO()
1202 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001203
Ezio Melotti2623a372010-11-21 13:34:58 +00001204 self.assertEqual(bufio.write(b"abcd"), 4)
1205 self.assertEqual(bufio.write(b"efghi"), 5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001206 # 1 byte will be written, the rest will be buffered
1207 raw.block_on(b"k")
Ezio Melotti2623a372010-11-21 13:34:58 +00001208 self.assertEqual(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001209
Antoine Pitrou19690592009-06-12 20:14:08 +00001210 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1211 raw.block_on(b"0")
1212 try:
1213 bufio.write(b"opqrwxyz0123456789")
1214 except self.BlockingIOError as e:
1215 written = e.characters_written
1216 else:
1217 self.fail("BlockingIOError should have been raised")
Ezio Melotti2623a372010-11-21 13:34:58 +00001218 self.assertEqual(written, 16)
1219 self.assertEqual(raw.pop_written(),
Antoine Pitrou19690592009-06-12 20:14:08 +00001220 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001221
Ezio Melotti2623a372010-11-21 13:34:58 +00001222 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Antoine Pitrou19690592009-06-12 20:14:08 +00001223 s = raw.pop_written()
1224 # Previously buffered bytes were flushed
1225 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001226
Antoine Pitrou19690592009-06-12 20:14:08 +00001227 def test_write_and_rewind(self):
1228 raw = io.BytesIO()
1229 bufio = self.tp(raw, 4)
1230 self.assertEqual(bufio.write(b"abcdef"), 6)
1231 self.assertEqual(bufio.tell(), 6)
1232 bufio.seek(0, 0)
1233 self.assertEqual(bufio.write(b"XY"), 2)
1234 bufio.seek(6, 0)
1235 self.assertEqual(raw.getvalue(), b"XYcdef")
1236 self.assertEqual(bufio.write(b"123456"), 6)
1237 bufio.flush()
1238 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001239
Antoine Pitrou19690592009-06-12 20:14:08 +00001240 def test_flush(self):
1241 writer = self.MockRawIO()
1242 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001243 bufio.write(b"abc")
1244 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001245 self.assertEqual(b"abc", writer._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001246
Antoine Pitrou78e761e2012-10-16 22:57:11 +02001247 def test_writelines(self):
1248 l = [b'ab', b'cd', b'ef']
1249 writer = self.MockRawIO()
1250 bufio = self.tp(writer, 8)
1251 bufio.writelines(l)
1252 bufio.flush()
1253 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1254
1255 def test_writelines_userlist(self):
1256 l = UserList([b'ab', b'cd', b'ef'])
1257 writer = self.MockRawIO()
1258 bufio = self.tp(writer, 8)
1259 bufio.writelines(l)
1260 bufio.flush()
1261 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1262
1263 def test_writelines_error(self):
1264 writer = self.MockRawIO()
1265 bufio = self.tp(writer, 8)
1266 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1267 self.assertRaises(TypeError, bufio.writelines, None)
1268
Antoine Pitrou19690592009-06-12 20:14:08 +00001269 def test_destructor(self):
1270 writer = self.MockRawIO()
1271 bufio = self.tp(writer, 8)
1272 bufio.write(b"abc")
1273 del bufio
1274 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001275 self.assertEqual(b"abc", writer._write_stack[0])
Antoine Pitrou19690592009-06-12 20:14:08 +00001276
1277 def test_truncate(self):
1278 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001279 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001280 bufio = self.tp(raw, 8)
1281 bufio.write(b"abcdef")
1282 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001283 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001284 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001285 self.assertEqual(f.read(), b"abc")
1286
Victor Stinner6a102812010-04-27 23:55:59 +00001287 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001288 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001289 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001290 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001291 # Write out many bytes from many threads and test they were
1292 # all flushed.
1293 N = 1000
1294 contents = bytes(range(256)) * N
1295 sizes = cycle([1, 19])
1296 n = 0
1297 queue = deque()
1298 while n < len(contents):
1299 size = next(sizes)
1300 queue.append(contents[n:n+size])
1301 n += size
1302 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001303 # We use a real file object because it allows us to
1304 # exercise situations where the GIL is released before
1305 # writing the buffer to the raw streams. This is in addition
1306 # to concurrency issues due to switching threads in the middle
1307 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001308 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001309 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001310 errors = []
1311 def f():
1312 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001313 while True:
1314 try:
1315 s = queue.popleft()
1316 except IndexError:
1317 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001318 bufio.write(s)
1319 except Exception as e:
1320 errors.append(e)
1321 raise
1322 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03001323 with support.start_threads(threads):
1324 time.sleep(0.02) # yield
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001325 self.assertFalse(errors,
1326 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001327 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001328 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001329 s = f.read()
1330 for i in range(256):
Ezio Melotti2623a372010-11-21 13:34:58 +00001331 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001332 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001333 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001334
Antoine Pitrou19690592009-06-12 20:14:08 +00001335 def test_misbehaved_io(self):
1336 rawio = self.MisbehavedRawIO()
1337 bufio = self.tp(rawio, 5)
1338 self.assertRaises(IOError, bufio.seek, 0)
1339 self.assertRaises(IOError, bufio.tell)
1340 self.assertRaises(IOError, bufio.write, b"abcdef")
1341
1342 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001343 with support.check_warnings(("max_buffer_size is deprecated",
1344 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001345 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001346
Benjamin Petersona2d6d712012-12-20 12:24:10 -06001347 def test_write_error_on_close(self):
1348 raw = self.MockRawIO()
1349 def bad_write(b):
1350 raise IOError()
1351 raw.write = bad_write
1352 b = self.tp(raw)
1353 b.write(b'spam')
1354 self.assertRaises(IOError, b.close) # exception not swallowed
1355 self.assertTrue(b.closed)
1356
Antoine Pitrou19690592009-06-12 20:14:08 +00001357
Antoine Pitroubff5df02012-07-29 19:02:46 +02001358class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001359 tp = io.BufferedWriter
1360
1361 def test_constructor(self):
1362 BufferedWriterTest.test_constructor(self)
1363 # The allocation can succeed on 32-bit builds, e.g. with more
1364 # than 2GB RAM and a 64-bit kernel.
1365 if sys.maxsize > 0x7FFFFFFF:
1366 rawio = self.MockRawIO()
1367 bufio = self.tp(rawio)
1368 self.assertRaises((OverflowError, MemoryError, ValueError),
1369 bufio.__init__, rawio, sys.maxsize)
1370
1371 def test_initialization(self):
1372 rawio = self.MockRawIO()
1373 bufio = self.tp(rawio)
1374 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1375 self.assertRaises(ValueError, bufio.write, b"def")
1376 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1377 self.assertRaises(ValueError, bufio.write, b"def")
1378 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1379 self.assertRaises(ValueError, bufio.write, b"def")
1380
1381 def test_garbage_collection(self):
1382 # C BufferedWriter objects are collected, and collecting them flushes
1383 # all data to disk.
1384 # The Python version has __del__, so it ends into gc.garbage instead
1385 rawio = self.FileIO(support.TESTFN, "w+b")
1386 f = self.tp(rawio)
1387 f.write(b"123xxx")
1388 f.x = f
1389 wr = weakref.ref(f)
1390 del f
1391 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03001392 self.assertIsNone(wr(), wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001393 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001394 self.assertEqual(f.read(), b"123xxx")
1395
R David Murray5b2cf5e2013-02-23 22:11:21 -05001396 def test_args_error(self):
1397 # Issue #17275
1398 with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
1399 self.tp(io.BytesIO(), 1024, 1024, 1024)
1400
Antoine Pitrou19690592009-06-12 20:14:08 +00001401
1402class PyBufferedWriterTest(BufferedWriterTest):
1403 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001404
1405class BufferedRWPairTest(unittest.TestCase):
1406
Antoine Pitrou19690592009-06-12 20:14:08 +00001407 def test_constructor(self):
1408 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001409 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001410
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001411 def test_uninitialized(self):
1412 pair = self.tp.__new__(self.tp)
1413 del pair
1414 pair = self.tp.__new__(self.tp)
1415 self.assertRaisesRegexp((ValueError, AttributeError),
1416 'uninitialized|has no attribute',
1417 pair.read, 0)
1418 self.assertRaisesRegexp((ValueError, AttributeError),
1419 'uninitialized|has no attribute',
1420 pair.write, b'')
1421 pair.__init__(self.MockRawIO(), self.MockRawIO())
1422 self.assertEqual(pair.read(0), b'')
1423 self.assertEqual(pair.write(b''), 0)
1424
Antoine Pitrou19690592009-06-12 20:14:08 +00001425 def test_detach(self):
1426 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1427 self.assertRaises(self.UnsupportedOperation, pair.detach)
1428
1429 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001430 with support.check_warnings(("max_buffer_size is deprecated",
1431 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001432 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001433
1434 def test_constructor_with_not_readable(self):
1435 class NotReadable(MockRawIO):
1436 def readable(self):
1437 return False
1438
1439 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1440
1441 def test_constructor_with_not_writeable(self):
1442 class NotWriteable(MockRawIO):
1443 def writable(self):
1444 return False
1445
1446 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1447
1448 def test_read(self):
1449 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1450
1451 self.assertEqual(pair.read(3), b"abc")
1452 self.assertEqual(pair.read(1), b"d")
1453 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001454 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1455 self.assertEqual(pair.read(None), b"abc")
1456
1457 def test_readlines(self):
1458 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1459 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1460 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1461 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001462
1463 def test_read1(self):
1464 # .read1() is delegated to the underlying reader object, so this test
1465 # can be shallow.
1466 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1467
1468 self.assertEqual(pair.read1(3), b"abc")
1469
1470 def test_readinto(self):
1471 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1472
Martin Panterc9813d82016-06-03 05:59:20 +00001473 data = byteslike(5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001474 self.assertEqual(pair.readinto(data), 5)
Martin Panterc9813d82016-06-03 05:59:20 +00001475 self.assertEqual(data.tobytes(), b"abcde")
Antoine Pitrou19690592009-06-12 20:14:08 +00001476
1477 def test_write(self):
1478 w = self.MockRawIO()
1479 pair = self.tp(self.MockRawIO(), w)
1480
1481 pair.write(b"abc")
1482 pair.flush()
Martin Panterc9813d82016-06-03 05:59:20 +00001483 buffer = bytearray(b"def")
1484 pair.write(buffer)
1485 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitrou19690592009-06-12 20:14:08 +00001486 pair.flush()
1487 self.assertEqual(w._write_stack, [b"abc", b"def"])
1488
1489 def test_peek(self):
1490 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1491
1492 self.assertTrue(pair.peek(3).startswith(b"abc"))
1493 self.assertEqual(pair.read(3), b"abc")
1494
1495 def test_readable(self):
1496 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1497 self.assertTrue(pair.readable())
1498
1499 def test_writeable(self):
1500 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1501 self.assertTrue(pair.writable())
1502
1503 def test_seekable(self):
1504 # BufferedRWPairs are never seekable, even if their readers and writers
1505 # are.
1506 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1507 self.assertFalse(pair.seekable())
1508
1509 # .flush() is delegated to the underlying writer object and has been
1510 # tested in the test_write method.
1511
1512 def test_close_and_closed(self):
1513 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1514 self.assertFalse(pair.closed)
1515 pair.close()
1516 self.assertTrue(pair.closed)
1517
Serhiy Storchakaf95a57f2015-03-24 23:23:42 +02001518 def test_reader_close_error_on_close(self):
1519 def reader_close():
1520 reader_non_existing
1521 reader = self.MockRawIO()
1522 reader.close = reader_close
1523 writer = self.MockRawIO()
1524 pair = self.tp(reader, writer)
1525 with self.assertRaises(NameError) as err:
1526 pair.close()
1527 self.assertIn('reader_non_existing', str(err.exception))
1528 self.assertTrue(pair.closed)
1529 self.assertFalse(reader.closed)
1530 self.assertTrue(writer.closed)
1531
1532 def test_writer_close_error_on_close(self):
1533 def writer_close():
1534 writer_non_existing
1535 reader = self.MockRawIO()
1536 writer = self.MockRawIO()
1537 writer.close = writer_close
1538 pair = self.tp(reader, writer)
1539 with self.assertRaises(NameError) as err:
1540 pair.close()
1541 self.assertIn('writer_non_existing', str(err.exception))
1542 self.assertFalse(pair.closed)
1543 self.assertTrue(reader.closed)
1544 self.assertFalse(writer.closed)
1545
1546 def test_reader_writer_close_error_on_close(self):
1547 def reader_close():
1548 reader_non_existing
1549 def writer_close():
1550 writer_non_existing
1551 reader = self.MockRawIO()
1552 reader.close = reader_close
1553 writer = self.MockRawIO()
1554 writer.close = writer_close
1555 pair = self.tp(reader, writer)
1556 with self.assertRaises(NameError) as err:
1557 pair.close()
1558 self.assertIn('reader_non_existing', str(err.exception))
1559 self.assertFalse(pair.closed)
1560 self.assertFalse(reader.closed)
1561 self.assertFalse(writer.closed)
1562
Antoine Pitrou19690592009-06-12 20:14:08 +00001563 def test_isatty(self):
1564 class SelectableIsAtty(MockRawIO):
1565 def __init__(self, isatty):
1566 MockRawIO.__init__(self)
1567 self._isatty = isatty
1568
1569 def isatty(self):
1570 return self._isatty
1571
1572 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1573 self.assertFalse(pair.isatty())
1574
1575 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1576 self.assertTrue(pair.isatty())
1577
1578 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1579 self.assertTrue(pair.isatty())
1580
1581 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1582 self.assertTrue(pair.isatty())
1583
Benjamin Peterson1c873bf2014-09-29 22:46:57 -04001584 def test_weakref_clearing(self):
1585 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1586 ref = weakref.ref(brw)
1587 brw = None
1588 ref = None # Shouldn't segfault.
1589
Antoine Pitrou19690592009-06-12 20:14:08 +00001590class CBufferedRWPairTest(BufferedRWPairTest):
1591 tp = io.BufferedRWPair
1592
1593class PyBufferedRWPairTest(BufferedRWPairTest):
1594 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001595
1596
Antoine Pitrou19690592009-06-12 20:14:08 +00001597class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1598 read_mode = "rb+"
1599 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001600
Antoine Pitrou19690592009-06-12 20:14:08 +00001601 def test_constructor(self):
1602 BufferedReaderTest.test_constructor(self)
1603 BufferedWriterTest.test_constructor(self)
1604
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001605 def test_uninitialized(self):
1606 BufferedReaderTest.test_uninitialized(self)
1607 BufferedWriterTest.test_uninitialized(self)
1608
Antoine Pitrou19690592009-06-12 20:14:08 +00001609 def test_read_and_write(self):
1610 raw = self.MockRawIO((b"asdf", b"ghjk"))
1611 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001612
1613 self.assertEqual(b"as", rw.read(2))
1614 rw.write(b"ddd")
1615 rw.write(b"eee")
1616 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001617 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001618 self.assertEqual(b"dddeee", raw._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001619
Antoine Pitrou19690592009-06-12 20:14:08 +00001620 def test_seek_and_tell(self):
1621 raw = self.BytesIO(b"asdfghjkl")
1622 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001623
Ezio Melotti2623a372010-11-21 13:34:58 +00001624 self.assertEqual(b"as", rw.read(2))
1625 self.assertEqual(2, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001626 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001627 self.assertEqual(b"asdf", rw.read(4))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001628
Antoine Pitrou808cec52011-08-20 15:40:58 +02001629 rw.write(b"123f")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001630 rw.seek(0, 0)
Antoine Pitrou808cec52011-08-20 15:40:58 +02001631 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001632 self.assertEqual(9, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001633 rw.seek(-4, 2)
Ezio Melotti2623a372010-11-21 13:34:58 +00001634 self.assertEqual(5, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001635 rw.seek(2, 1)
Ezio Melotti2623a372010-11-21 13:34:58 +00001636 self.assertEqual(7, rw.tell())
1637 self.assertEqual(b"fl", rw.read(11))
Antoine Pitrou808cec52011-08-20 15:40:58 +02001638 rw.flush()
1639 self.assertEqual(b"asdf123fl", raw.getvalue())
1640
Christian Heimes1a6387e2008-03-26 12:49:49 +00001641 self.assertRaises(TypeError, rw.seek, 0.0)
1642
Antoine Pitrou19690592009-06-12 20:14:08 +00001643 def check_flush_and_read(self, read_func):
1644 raw = self.BytesIO(b"abcdefghi")
1645 bufio = self.tp(raw)
1646
Ezio Melotti2623a372010-11-21 13:34:58 +00001647 self.assertEqual(b"ab", read_func(bufio, 2))
Antoine Pitrou19690592009-06-12 20:14:08 +00001648 bufio.write(b"12")
Ezio Melotti2623a372010-11-21 13:34:58 +00001649 self.assertEqual(b"ef", read_func(bufio, 2))
1650 self.assertEqual(6, bufio.tell())
Antoine Pitrou19690592009-06-12 20:14:08 +00001651 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001652 self.assertEqual(6, bufio.tell())
1653 self.assertEqual(b"ghi", read_func(bufio))
Antoine Pitrou19690592009-06-12 20:14:08 +00001654 raw.seek(0, 0)
1655 raw.write(b"XYZ")
1656 # flush() resets the read buffer
1657 bufio.flush()
1658 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001659 self.assertEqual(b"XYZ", read_func(bufio, 3))
Antoine Pitrou19690592009-06-12 20:14:08 +00001660
1661 def test_flush_and_read(self):
1662 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1663
1664 def test_flush_and_readinto(self):
1665 def _readinto(bufio, n=-1):
1666 b = bytearray(n if n >= 0 else 9999)
1667 n = bufio.readinto(b)
1668 return bytes(b[:n])
1669 self.check_flush_and_read(_readinto)
1670
1671 def test_flush_and_peek(self):
1672 def _peek(bufio, n=-1):
1673 # This relies on the fact that the buffer can contain the whole
1674 # raw stream, otherwise peek() can return less.
1675 b = bufio.peek(n)
1676 if n != -1:
1677 b = b[:n]
1678 bufio.seek(len(b), 1)
1679 return b
1680 self.check_flush_and_read(_peek)
1681
1682 def test_flush_and_write(self):
1683 raw = self.BytesIO(b"abcdefghi")
1684 bufio = self.tp(raw)
1685
1686 bufio.write(b"123")
1687 bufio.flush()
1688 bufio.write(b"45")
1689 bufio.flush()
1690 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001691 self.assertEqual(b"12345fghi", raw.getvalue())
1692 self.assertEqual(b"12345fghi", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +00001693
1694 def test_threads(self):
1695 BufferedReaderTest.test_threads(self)
1696 BufferedWriterTest.test_threads(self)
1697
1698 def test_writes_and_peek(self):
1699 def _peek(bufio):
1700 bufio.peek(1)
1701 self.check_writes(_peek)
1702 def _peek(bufio):
1703 pos = bufio.tell()
1704 bufio.seek(-1, 1)
1705 bufio.peek(1)
1706 bufio.seek(pos, 0)
1707 self.check_writes(_peek)
1708
1709 def test_writes_and_reads(self):
1710 def _read(bufio):
1711 bufio.seek(-1, 1)
1712 bufio.read(1)
1713 self.check_writes(_read)
1714
1715 def test_writes_and_read1s(self):
1716 def _read1(bufio):
1717 bufio.seek(-1, 1)
1718 bufio.read1(1)
1719 self.check_writes(_read1)
1720
1721 def test_writes_and_readintos(self):
1722 def _read(bufio):
1723 bufio.seek(-1, 1)
1724 bufio.readinto(bytearray(1))
1725 self.check_writes(_read)
1726
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001727 def test_write_after_readahead(self):
1728 # Issue #6629: writing after the buffer was filled by readahead should
1729 # first rewind the raw stream.
1730 for overwrite_size in [1, 5]:
1731 raw = self.BytesIO(b"A" * 10)
1732 bufio = self.tp(raw, 4)
1733 # Trigger readahead
1734 self.assertEqual(bufio.read(1), b"A")
1735 self.assertEqual(bufio.tell(), 1)
1736 # Overwriting should rewind the raw stream if it needs so
1737 bufio.write(b"B" * overwrite_size)
1738 self.assertEqual(bufio.tell(), overwrite_size + 1)
1739 # If the write size was smaller than the buffer size, flush() and
1740 # check that rewind happens.
1741 bufio.flush()
1742 self.assertEqual(bufio.tell(), overwrite_size + 1)
1743 s = raw.getvalue()
1744 self.assertEqual(s,
1745 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1746
Antoine Pitrouee46a7b2011-05-13 00:31:52 +02001747 def test_write_rewind_write(self):
1748 # Various combinations of reading / writing / seeking backwards / writing again
1749 def mutate(bufio, pos1, pos2):
1750 assert pos2 >= pos1
1751 # Fill the buffer
1752 bufio.seek(pos1)
1753 bufio.read(pos2 - pos1)
1754 bufio.write(b'\x02')
1755 # This writes earlier than the previous write, but still inside
1756 # the buffer.
1757 bufio.seek(pos1)
1758 bufio.write(b'\x01')
1759
1760 b = b"\x80\x81\x82\x83\x84"
1761 for i in range(0, len(b)):
1762 for j in range(i, len(b)):
1763 raw = self.BytesIO(b)
1764 bufio = self.tp(raw, 100)
1765 mutate(bufio, i, j)
1766 bufio.flush()
1767 expected = bytearray(b)
1768 expected[j] = 2
1769 expected[i] = 1
1770 self.assertEqual(raw.getvalue(), expected,
1771 "failed result for i=%d, j=%d" % (i, j))
1772
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001773 def test_truncate_after_read_or_write(self):
1774 raw = self.BytesIO(b"A" * 10)
1775 bufio = self.tp(raw, 100)
1776 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1777 self.assertEqual(bufio.truncate(), 2)
1778 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1779 self.assertEqual(bufio.truncate(), 4)
1780
Antoine Pitrou19690592009-06-12 20:14:08 +00001781 def test_misbehaved_io(self):
1782 BufferedReaderTest.test_misbehaved_io(self)
1783 BufferedWriterTest.test_misbehaved_io(self)
1784
Antoine Pitrou808cec52011-08-20 15:40:58 +02001785 def test_interleaved_read_write(self):
1786 # Test for issue #12213
1787 with self.BytesIO(b'abcdefgh') as raw:
1788 with self.tp(raw, 100) as f:
1789 f.write(b"1")
1790 self.assertEqual(f.read(1), b'b')
1791 f.write(b'2')
1792 self.assertEqual(f.read1(1), b'd')
1793 f.write(b'3')
1794 buf = bytearray(1)
1795 f.readinto(buf)
1796 self.assertEqual(buf, b'f')
1797 f.write(b'4')
1798 self.assertEqual(f.peek(1), b'h')
1799 f.flush()
1800 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1801
1802 with self.BytesIO(b'abc') as raw:
1803 with self.tp(raw, 100) as f:
1804 self.assertEqual(f.read(1), b'a')
1805 f.write(b"2")
1806 self.assertEqual(f.read(1), b'c')
1807 f.flush()
1808 self.assertEqual(raw.getvalue(), b'a2c')
1809
1810 def test_interleaved_readline_write(self):
1811 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1812 with self.tp(raw) as f:
1813 f.write(b'1')
1814 self.assertEqual(f.readline(), b'b\n')
1815 f.write(b'2')
1816 self.assertEqual(f.readline(), b'def\n')
1817 f.write(b'3')
1818 self.assertEqual(f.readline(), b'\n')
1819 f.flush()
1820 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1821
R David Murray5b2cf5e2013-02-23 22:11:21 -05001822
Antoine Pitroubff5df02012-07-29 19:02:46 +02001823class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
1824 BufferedRandomTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001825 tp = io.BufferedRandom
1826
1827 def test_constructor(self):
1828 BufferedRandomTest.test_constructor(self)
1829 # The allocation can succeed on 32-bit builds, e.g. with more
1830 # than 2GB RAM and a 64-bit kernel.
1831 if sys.maxsize > 0x7FFFFFFF:
1832 rawio = self.MockRawIO()
1833 bufio = self.tp(rawio)
1834 self.assertRaises((OverflowError, MemoryError, ValueError),
1835 bufio.__init__, rawio, sys.maxsize)
1836
1837 def test_garbage_collection(self):
1838 CBufferedReaderTest.test_garbage_collection(self)
1839 CBufferedWriterTest.test_garbage_collection(self)
1840
R David Murray5b2cf5e2013-02-23 22:11:21 -05001841 def test_args_error(self):
1842 # Issue #17275
1843 with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
1844 self.tp(io.BytesIO(), 1024, 1024, 1024)
1845
1846
Antoine Pitrou19690592009-06-12 20:14:08 +00001847class PyBufferedRandomTest(BufferedRandomTest):
1848 tp = pyio.BufferedRandom
1849
1850
Christian Heimes1a6387e2008-03-26 12:49:49 +00001851# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1852# properties:
1853# - A single output character can correspond to many bytes of input.
1854# - The number of input bytes to complete the character can be
1855# undetermined until the last input byte is received.
1856# - The number of input bytes can vary depending on previous input.
1857# - A single input byte can correspond to many characters of output.
1858# - The number of output characters can be undetermined until the
1859# last input byte is received.
1860# - The number of output characters can vary depending on previous input.
1861
1862class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1863 """
1864 For testing seek/tell behavior with a stateful, buffering decoder.
1865
1866 Input is a sequence of words. Words may be fixed-length (length set
1867 by input) or variable-length (period-terminated). In variable-length
1868 mode, extra periods are ignored. Possible words are:
1869 - 'i' followed by a number sets the input length, I (maximum 99).
1870 When I is set to 0, words are space-terminated.
1871 - 'o' followed by a number sets the output length, O (maximum 99).
1872 - Any other word is converted into a word followed by a period on
1873 the output. The output word consists of the input word truncated
1874 or padded out with hyphens to make its length equal to O. If O
1875 is 0, the word is output verbatim without truncating or padding.
1876 I and O are initially set to 1. When I changes, any buffered input is
1877 re-scanned according to the new I. EOF also terminates the last word.
1878 """
1879
1880 def __init__(self, errors='strict'):
1881 codecs.IncrementalDecoder.__init__(self, errors)
1882 self.reset()
1883
1884 def __repr__(self):
1885 return '<SID %x>' % id(self)
1886
1887 def reset(self):
1888 self.i = 1
1889 self.o = 1
1890 self.buffer = bytearray()
1891
1892 def getstate(self):
1893 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1894 return bytes(self.buffer), i*100 + o
1895
1896 def setstate(self, state):
1897 buffer, io = state
1898 self.buffer = bytearray(buffer)
1899 i, o = divmod(io, 100)
1900 self.i, self.o = i ^ 1, o ^ 1
1901
1902 def decode(self, input, final=False):
1903 output = ''
1904 for b in input:
1905 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001906 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001907 if self.buffer:
1908 output += self.process_word()
1909 else:
1910 self.buffer.append(b)
1911 else: # fixed-length, terminate after self.i bytes
1912 self.buffer.append(b)
1913 if len(self.buffer) == self.i:
1914 output += self.process_word()
1915 if final and self.buffer: # EOF terminates the last word
1916 output += self.process_word()
1917 return output
1918
1919 def process_word(self):
1920 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001921 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001922 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001923 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001924 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1925 else:
1926 output = self.buffer.decode('ascii')
1927 if len(output) < self.o:
1928 output += '-'*self.o # pad out with hyphens
1929 if self.o:
1930 output = output[:self.o] # truncate to output length
1931 output += '.'
1932 self.buffer = bytearray()
1933 return output
1934
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001935 codecEnabled = False
1936
1937 @classmethod
1938 def lookupTestDecoder(cls, name):
1939 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001940 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001941 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001942 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001943 incrementalencoder=None,
1944 streamreader=None, streamwriter=None,
1945 incrementaldecoder=cls)
1946
1947# Register the previous decoder for testing.
1948# Disabled by default, tests will enable it.
1949codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1950
1951
Christian Heimes1a6387e2008-03-26 12:49:49 +00001952class StatefulIncrementalDecoderTest(unittest.TestCase):
1953 """
1954 Make sure the StatefulIncrementalDecoder actually works.
1955 """
1956
1957 test_cases = [
1958 # I=1, O=1 (fixed-length input == fixed-length output)
1959 (b'abcd', False, 'a.b.c.d.'),
1960 # I=0, O=0 (variable-length input, variable-length output)
1961 (b'oiabcd', True, 'abcd.'),
1962 # I=0, O=0 (should ignore extra periods)
1963 (b'oi...abcd...', True, 'abcd.'),
1964 # I=0, O=6 (variable-length input, fixed-length output)
1965 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1966 # I=2, O=6 (fixed-length input < fixed-length output)
1967 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1968 # I=6, O=3 (fixed-length input > fixed-length output)
1969 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1970 # I=0, then 3; O=29, then 15 (with longer output)
1971 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1972 'a----------------------------.' +
1973 'b----------------------------.' +
1974 'cde--------------------------.' +
1975 'abcdefghijabcde.' +
1976 'a.b------------.' +
1977 '.c.------------.' +
1978 'd.e------------.' +
1979 'k--------------.' +
1980 'l--------------.' +
1981 'm--------------.')
1982 ]
1983
Antoine Pitrou19690592009-06-12 20:14:08 +00001984 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001985 # Try a few one-shot test cases.
1986 for input, eof, output in self.test_cases:
1987 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001988 self.assertEqual(d.decode(input, eof), output)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001989
1990 # Also test an unfinished decode, followed by forcing EOF.
1991 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001992 self.assertEqual(d.decode(b'oiabcd'), '')
1993 self.assertEqual(d.decode(b'', 1), 'abcd.')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001994
1995class TextIOWrapperTest(unittest.TestCase):
1996
1997 def setUp(self):
1998 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1999 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00002000 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002001
2002 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002003 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002004
Antoine Pitrou19690592009-06-12 20:14:08 +00002005 def test_constructor(self):
2006 r = self.BytesIO(b"\xc3\xa9\n\n")
2007 b = self.BufferedReader(r, 1000)
2008 t = self.TextIOWrapper(b)
2009 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti2623a372010-11-21 13:34:58 +00002010 self.assertEqual(t.encoding, "latin1")
2011 self.assertEqual(t.line_buffering, False)
Antoine Pitrou19690592009-06-12 20:14:08 +00002012 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti2623a372010-11-21 13:34:58 +00002013 self.assertEqual(t.encoding, "utf8")
2014 self.assertEqual(t.line_buffering, True)
2015 self.assertEqual("\xe9\n", t.readline())
Antoine Pitrou19690592009-06-12 20:14:08 +00002016 self.assertRaises(TypeError, t.__init__, b, newline=42)
2017 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2018
Serhiy Storchakaabb7e652015-04-16 11:56:35 +03002019 def test_uninitialized(self):
2020 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2021 del t
2022 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2023 self.assertRaises(Exception, repr, t)
2024 self.assertRaisesRegexp((ValueError, AttributeError),
2025 'uninitialized|has no attribute',
2026 t.read, 0)
2027 t.__init__(self.MockRawIO())
2028 self.assertEqual(t.read(0), u'')
2029
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002030 def test_non_text_encoding_codecs_are_rejected(self):
2031 # Ensure the constructor complains if passed a codec that isn't
2032 # marked as a text encoding
2033 # http://bugs.python.org/issue20404
2034 r = self.BytesIO()
2035 b = self.BufferedWriter(r)
2036 with support.check_py3k_warnings():
2037 self.TextIOWrapper(b, encoding="hex_codec")
2038
Antoine Pitrou19690592009-06-12 20:14:08 +00002039 def test_detach(self):
2040 r = self.BytesIO()
2041 b = self.BufferedWriter(r)
2042 t = self.TextIOWrapper(b)
2043 self.assertIs(t.detach(), b)
2044
2045 t = self.TextIOWrapper(b, encoding="ascii")
2046 t.write("howdy")
2047 self.assertFalse(r.getvalue())
2048 t.detach()
2049 self.assertEqual(r.getvalue(), b"howdy")
2050 self.assertRaises(ValueError, t.detach)
2051
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002052 # Operations independent of the detached stream should still work
2053 repr(t)
2054 self.assertEqual(t.encoding, "ascii")
2055 self.assertEqual(t.errors, "strict")
2056 self.assertFalse(t.line_buffering)
2057
Antoine Pitrou19690592009-06-12 20:14:08 +00002058 def test_repr(self):
2059 raw = self.BytesIO("hello".encode("utf-8"))
2060 b = self.BufferedReader(raw)
2061 t = self.TextIOWrapper(b, encoding="utf-8")
2062 modname = self.TextIOWrapper.__module__
2063 self.assertEqual(repr(t),
2064 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2065 raw.name = "dummy"
2066 self.assertEqual(repr(t),
2067 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
2068 raw.name = b"dummy"
2069 self.assertEqual(repr(t),
2070 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
2071
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002072 t.buffer.detach()
2073 repr(t) # Should not raise an exception
2074
Antoine Pitrou19690592009-06-12 20:14:08 +00002075 def test_line_buffering(self):
2076 r = self.BytesIO()
2077 b = self.BufferedWriter(r, 1000)
2078 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
2079 t.write("X")
Ezio Melotti2623a372010-11-21 13:34:58 +00002080 self.assertEqual(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00002081 t.write("Y\nZ")
Ezio Melotti2623a372010-11-21 13:34:58 +00002082 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00002083 t.write("A\rB")
Ezio Melotti2623a372010-11-21 13:34:58 +00002084 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002085
Antoine Pitrou19690592009-06-12 20:14:08 +00002086 def test_encoding(self):
2087 # Check the encoding attribute is always set, and valid
2088 b = self.BytesIO()
2089 t = self.TextIOWrapper(b, encoding="utf8")
2090 self.assertEqual(t.encoding, "utf8")
2091 t = self.TextIOWrapper(b)
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03002092 self.assertIsNotNone(t.encoding)
Antoine Pitrou19690592009-06-12 20:14:08 +00002093 codecs.lookup(t.encoding)
2094
2095 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002096 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00002097 b = self.BytesIO(b"abc\n\xff\n")
2098 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002099 self.assertRaises(UnicodeError, t.read)
2100 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00002101 b = self.BytesIO(b"abc\n\xff\n")
2102 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002103 self.assertRaises(UnicodeError, t.read)
2104 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00002105 b = self.BytesIO(b"abc\n\xff\n")
2106 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti2623a372010-11-21 13:34:58 +00002107 self.assertEqual(t.read(), "abc\n\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002108 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00002109 b = self.BytesIO(b"abc\n\xff\n")
2110 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti2623a372010-11-21 13:34:58 +00002111 self.assertEqual(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002112
Antoine Pitrou19690592009-06-12 20:14:08 +00002113 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002114 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00002115 b = self.BytesIO()
2116 t = self.TextIOWrapper(b, encoding="ascii")
2117 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002118 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00002119 b = self.BytesIO()
2120 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2121 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002122 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00002123 b = self.BytesIO()
2124 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00002125 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00002126 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002127 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002128 self.assertEqual(b.getvalue(), b"abcdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002129 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00002130 b = self.BytesIO()
2131 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00002132 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00002133 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002134 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002135 self.assertEqual(b.getvalue(), b"abc?def\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002136
Antoine Pitrou19690592009-06-12 20:14:08 +00002137 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002138 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2139
2140 tests = [
2141 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
2142 [ '', input_lines ],
2143 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2144 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2145 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
2146 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002147 encodings = (
2148 'utf-8', 'latin-1',
2149 'utf-16', 'utf-16-le', 'utf-16-be',
2150 'utf-32', 'utf-32-le', 'utf-32-be',
2151 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00002152
2153 # Try a range of buffer sizes to test the case where \r is the last
2154 # character in TextIOWrapper._pending_line.
2155 for encoding in encodings:
2156 # XXX: str.encode() should return bytes
2157 data = bytes(''.join(input_lines).encode(encoding))
2158 for do_reads in (False, True):
2159 for bufsize in range(1, 10):
2160 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002161 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2162 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00002163 encoding=encoding)
2164 if do_reads:
2165 got_lines = []
2166 while True:
2167 c2 = textio.read(2)
2168 if c2 == '':
2169 break
Ezio Melotti2623a372010-11-21 13:34:58 +00002170 self.assertEqual(len(c2), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002171 got_lines.append(c2 + textio.readline())
2172 else:
2173 got_lines = list(textio)
2174
2175 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti2623a372010-11-21 13:34:58 +00002176 self.assertEqual(got_line, exp_line)
2177 self.assertEqual(len(got_lines), len(exp_lines))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002178
Antoine Pitrou19690592009-06-12 20:14:08 +00002179 def test_newlines_input(self):
2180 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002181 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2182 for newline, expected in [
2183 (None, normalized.decode("ascii").splitlines(True)),
2184 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00002185 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2186 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2187 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00002188 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00002189 buf = self.BytesIO(testdata)
2190 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti2623a372010-11-21 13:34:58 +00002191 self.assertEqual(txt.readlines(), expected)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002192 txt.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002193 self.assertEqual(txt.read(), "".join(expected))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002194
Antoine Pitrou19690592009-06-12 20:14:08 +00002195 def test_newlines_output(self):
2196 testdict = {
2197 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2198 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2199 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2200 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2201 }
2202 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2203 for newline, expected in tests:
2204 buf = self.BytesIO()
2205 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2206 txt.write("AAA\nB")
2207 txt.write("BB\nCCC\n")
2208 txt.write("X\rY\r\nZ")
2209 txt.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002210 self.assertEqual(buf.closed, False)
2211 self.assertEqual(buf.getvalue(), expected)
Antoine Pitrou19690592009-06-12 20:14:08 +00002212
2213 def test_destructor(self):
2214 l = []
2215 base = self.BytesIO
2216 class MyBytesIO(base):
2217 def close(self):
2218 l.append(self.getvalue())
2219 base.close(self)
2220 b = MyBytesIO()
2221 t = self.TextIOWrapper(b, encoding="ascii")
2222 t.write("abc")
2223 del t
2224 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00002225 self.assertEqual([b"abc"], l)
Antoine Pitrou19690592009-06-12 20:14:08 +00002226
2227 def test_override_destructor(self):
2228 record = []
2229 class MyTextIO(self.TextIOWrapper):
2230 def __del__(self):
2231 record.append(1)
2232 try:
2233 f = super(MyTextIO, self).__del__
2234 except AttributeError:
2235 pass
2236 else:
2237 f()
2238 def close(self):
2239 record.append(2)
2240 super(MyTextIO, self).close()
2241 def flush(self):
2242 record.append(3)
2243 super(MyTextIO, self).flush()
2244 b = self.BytesIO()
2245 t = MyTextIO(b, encoding="ascii")
2246 del t
2247 support.gc_collect()
2248 self.assertEqual(record, [1, 2, 3])
2249
2250 def test_error_through_destructor(self):
2251 # Test that the exception state is not modified by a destructor,
2252 # even if close() fails.
2253 rawio = self.CloseFailureIO()
2254 def f():
2255 self.TextIOWrapper(rawio).xyzzy
2256 with support.captured_output("stderr") as s:
2257 self.assertRaises(AttributeError, f)
2258 s = s.getvalue().strip()
2259 if s:
2260 # The destructor *may* have printed an unraisable error, check it
2261 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002262 self.assertTrue(s.startswith("Exception IOError: "), s)
2263 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002264
2265 # Systematic tests of the text I/O API
2266
Antoine Pitrou19690592009-06-12 20:14:08 +00002267 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002268 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2269 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00002270 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002271 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002272 self.assertEqual(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002273 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002274 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002275 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002276 self.assertEqual(f.tell(), 0)
2277 self.assertEqual(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002278 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002279 self.assertEqual(f.seek(0), 0)
2280 self.assertEqual(f.read(None), "abc")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002281 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002282 self.assertEqual(f.read(2), "ab")
2283 self.assertEqual(f.read(1), "c")
2284 self.assertEqual(f.read(1), "")
2285 self.assertEqual(f.read(), "")
2286 self.assertEqual(f.tell(), cookie)
2287 self.assertEqual(f.seek(0), 0)
2288 self.assertEqual(f.seek(0, 2), cookie)
2289 self.assertEqual(f.write("def"), 3)
2290 self.assertEqual(f.seek(cookie), cookie)
2291 self.assertEqual(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002292 if enc.startswith("utf"):
2293 self.multi_line_test(f, enc)
2294 f.close()
2295
2296 def multi_line_test(self, f, enc):
2297 f.seek(0)
2298 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00002299 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002300 wlines = []
2301 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2302 chars = []
2303 for i in range(size):
2304 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00002305 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002306 wlines.append((f.tell(), line))
2307 f.write(line)
2308 f.seek(0)
2309 rlines = []
2310 while True:
2311 pos = f.tell()
2312 line = f.readline()
2313 if not line:
2314 break
2315 rlines.append((pos, line))
Ezio Melotti2623a372010-11-21 13:34:58 +00002316 self.assertEqual(rlines, wlines)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002317
Antoine Pitrou19690592009-06-12 20:14:08 +00002318 def test_telling(self):
2319 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002320 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002321 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002322 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002323 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002324 p2 = f.tell()
2325 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002326 self.assertEqual(f.tell(), p0)
2327 self.assertEqual(f.readline(), "\xff\n")
2328 self.assertEqual(f.tell(), p1)
2329 self.assertEqual(f.readline(), "\xff\n")
2330 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002331 f.seek(0)
2332 for line in f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002333 self.assertEqual(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002334 self.assertRaises(IOError, f.tell)
Ezio Melotti2623a372010-11-21 13:34:58 +00002335 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002336 f.close()
2337
Antoine Pitrou19690592009-06-12 20:14:08 +00002338 def test_seeking(self):
2339 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00002340 prefix_size = chunk_size - 2
2341 u_prefix = "a" * prefix_size
2342 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti2623a372010-11-21 13:34:58 +00002343 self.assertEqual(len(u_prefix), len(prefix))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002344 u_suffix = "\u8888\n"
2345 suffix = bytes(u_suffix.encode("utf-8"))
2346 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00002347 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002348 f.write(line*2)
2349 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002350 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002351 s = f.read(prefix_size)
Ezio Melotti2623a372010-11-21 13:34:58 +00002352 self.assertEqual(s, prefix.decode("ascii"))
2353 self.assertEqual(f.tell(), prefix_size)
2354 self.assertEqual(f.readline(), u_suffix)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002355
Antoine Pitrou19690592009-06-12 20:14:08 +00002356 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002357 # Regression test for a specific bug
2358 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00002359 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002360 f.write(data)
2361 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002362 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002363 f._CHUNK_SIZE # Just test that it exists
2364 f._CHUNK_SIZE = 2
2365 f.readline()
2366 f.tell()
2367
Antoine Pitrou19690592009-06-12 20:14:08 +00002368 def test_seek_and_tell(self):
2369 #Test seek/tell using the StatefulIncrementalDecoder.
2370 # Make test faster by doing smaller seeks
2371 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00002372
Antoine Pitrou19690592009-06-12 20:14:08 +00002373 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002374 """Tell/seek to various points within a data stream and ensure
2375 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00002376 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002377 f.write(data)
2378 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002379 f = self.open(support.TESTFN, encoding='test_decoder')
2380 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00002381 decoded = f.read()
2382 f.close()
2383
2384 for i in range(min_pos, len(decoded) + 1): # seek positions
2385 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00002386 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti2623a372010-11-21 13:34:58 +00002387 self.assertEqual(f.read(i), decoded[:i])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002388 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002389 self.assertEqual(f.read(j), decoded[i:i + j])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002390 f.seek(cookie)
Ezio Melotti2623a372010-11-21 13:34:58 +00002391 self.assertEqual(f.read(), decoded[i:])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002392 f.close()
2393
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002394 # Enable the test decoder.
2395 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002396
2397 # Run the tests.
2398 try:
2399 # Try each test case.
2400 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002401 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002402
2403 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002404 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2405 offset = CHUNK_SIZE - len(input)//2
2406 prefix = b'.'*offset
2407 # Don't bother seeking into the prefix (takes too long).
2408 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002409 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002410
2411 # Ensure our test decoder won't interfere with subsequent tests.
2412 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002413 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002414
Antoine Pitrou19690592009-06-12 20:14:08 +00002415 def test_encoded_writes(self):
2416 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002417 tests = ("utf-16",
2418 "utf-16-le",
2419 "utf-16-be",
2420 "utf-32",
2421 "utf-32-le",
2422 "utf-32-be")
2423 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002424 buf = self.BytesIO()
2425 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002426 # Check if the BOM is written only once (see issue1753).
2427 f.write(data)
2428 f.write(data)
2429 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002430 self.assertEqual(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002431 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002432 self.assertEqual(f.read(), data * 2)
2433 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002434
Antoine Pitrou19690592009-06-12 20:14:08 +00002435 def test_unreadable(self):
2436 class UnReadable(self.BytesIO):
2437 def readable(self):
2438 return False
2439 txt = self.TextIOWrapper(UnReadable())
2440 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002441
Antoine Pitrou19690592009-06-12 20:14:08 +00002442 def test_read_one_by_one(self):
2443 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002444 reads = ""
2445 while True:
2446 c = txt.read(1)
2447 if not c:
2448 break
2449 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002450 self.assertEqual(reads, "AA\nBB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002451
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002452 def test_readlines(self):
2453 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2454 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2455 txt.seek(0)
2456 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2457 txt.seek(0)
2458 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2459
Christian Heimes1a6387e2008-03-26 12:49:49 +00002460 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002461 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002462 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002463 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002464 reads = ""
2465 while True:
2466 c = txt.read(128)
2467 if not c:
2468 break
2469 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002470 self.assertEqual(reads, "A"*127+"\nB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002471
Antoine Pitroueadca1d2012-10-16 23:02:27 +02002472 def test_writelines(self):
2473 l = ['ab', 'cd', 'ef']
2474 buf = self.BytesIO()
2475 txt = self.TextIOWrapper(buf)
2476 txt.writelines(l)
2477 txt.flush()
2478 self.assertEqual(buf.getvalue(), b'abcdef')
2479
2480 def test_writelines_userlist(self):
2481 l = UserList(['ab', 'cd', 'ef'])
2482 buf = self.BytesIO()
2483 txt = self.TextIOWrapper(buf)
2484 txt.writelines(l)
2485 txt.flush()
2486 self.assertEqual(buf.getvalue(), b'abcdef')
2487
2488 def test_writelines_error(self):
2489 txt = self.TextIOWrapper(self.BytesIO())
2490 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2491 self.assertRaises(TypeError, txt.writelines, None)
2492 self.assertRaises(TypeError, txt.writelines, b'abc')
2493
Christian Heimes1a6387e2008-03-26 12:49:49 +00002494 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002495 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002496
2497 # read one char at a time
2498 reads = ""
2499 while True:
2500 c = txt.read(1)
2501 if not c:
2502 break
2503 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002504 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002505
2506 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002507 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002508 txt._CHUNK_SIZE = 4
2509
2510 reads = ""
2511 while True:
2512 c = txt.read(4)
2513 if not c:
2514 break
2515 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002516 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002517
2518 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002519 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002520 txt._CHUNK_SIZE = 4
2521
2522 reads = txt.read(4)
2523 reads += txt.read(4)
2524 reads += txt.readline()
2525 reads += txt.readline()
2526 reads += txt.readline()
Ezio Melotti2623a372010-11-21 13:34:58 +00002527 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002528
2529 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002530 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002531 txt._CHUNK_SIZE = 4
2532
2533 reads = txt.read(4)
2534 reads += txt.read()
Ezio Melotti2623a372010-11-21 13:34:58 +00002535 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002536
2537 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002538 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002539 txt._CHUNK_SIZE = 4
2540
2541 reads = txt.read(4)
2542 pos = txt.tell()
2543 txt.seek(0)
2544 txt.seek(pos)
Ezio Melotti2623a372010-11-21 13:34:58 +00002545 self.assertEqual(txt.read(4), "BBB\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002546
2547 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002548 buffer = self.BytesIO(self.testdata)
2549 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002550
2551 self.assertEqual(buffer.seekable(), txt.seekable())
2552
Antoine Pitrou19690592009-06-12 20:14:08 +00002553 def test_append_bom(self):
2554 # The BOM is not written again when appending to a non-empty file
2555 filename = support.TESTFN
2556 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2557 with self.open(filename, 'w', encoding=charset) as f:
2558 f.write('aaa')
2559 pos = f.tell()
2560 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002561 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002562
2563 with self.open(filename, 'a', encoding=charset) as f:
2564 f.write('xxx')
2565 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002566 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002567
Antoine Pitrou19690592009-06-12 20:14:08 +00002568 def test_seek_bom(self):
2569 # Same test, but when seeking manually
2570 filename = support.TESTFN
2571 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2572 with self.open(filename, 'w', encoding=charset) as f:
2573 f.write('aaa')
2574 pos = f.tell()
2575 with self.open(filename, 'r+', encoding=charset) as f:
2576 f.seek(pos)
2577 f.write('zzz')
2578 f.seek(0)
2579 f.write('bbb')
2580 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002581 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002582
2583 def test_errors_property(self):
2584 with self.open(support.TESTFN, "w") as f:
2585 self.assertEqual(f.errors, "strict")
2586 with self.open(support.TESTFN, "w", errors="replace") as f:
2587 self.assertEqual(f.errors, "replace")
2588
Victor Stinner6a102812010-04-27 23:55:59 +00002589 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002590 def test_threads_write(self):
2591 # Issue6750: concurrent writes could duplicate data
2592 event = threading.Event()
2593 with self.open(support.TESTFN, "w", buffering=1) as f:
2594 def run(n):
2595 text = "Thread%03d\n" % n
2596 event.wait()
2597 f.write(text)
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03002598 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002599 for x in range(20)]
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03002600 with support.start_threads(threads, event.set):
2601 time.sleep(0.02)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002602 with self.open(support.TESTFN) as f:
2603 content = f.read()
2604 for n in range(20):
Ezio Melotti2623a372010-11-21 13:34:58 +00002605 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002606
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002607 def test_flush_error_on_close(self):
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002608 # Test that text file is closed despite failed flush
2609 # and that flush() is called before file closed.
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002610 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002611 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002612 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002613 closed[:] = [txt.closed, txt.buffer.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002614 raise IOError()
2615 txt.flush = bad_flush
2616 self.assertRaises(IOError, txt.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -06002617 self.assertTrue(txt.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002618 self.assertTrue(txt.buffer.closed)
2619 self.assertTrue(closed) # flush() called
2620 self.assertFalse(closed[0]) # flush() called before file closed
2621 self.assertFalse(closed[1])
Serhiy Storchaka437d5352015-02-23 00:28:38 +02002622 txt.flush = lambda: None # break reference loop
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002623
2624 def test_multi_close(self):
2625 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2626 txt.close()
2627 txt.close()
2628 txt.close()
2629 self.assertRaises(ValueError, txt.flush)
2630
Antoine Pitroufc9ead62010-12-21 21:26:55 +00002631 def test_readonly_attributes(self):
2632 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2633 buf = self.BytesIO(self.testdata)
2634 with self.assertRaises((AttributeError, TypeError)):
2635 txt.buffer = buf
2636
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002637 def test_read_nonbytes(self):
2638 # Issue #17106
2639 # Crash when underlying read() returns non-bytes
2640 class NonbytesStream(self.StringIO):
2641 read1 = self.StringIO.read
2642 class NonbytesStream(self.StringIO):
2643 read1 = self.StringIO.read
2644 t = self.TextIOWrapper(NonbytesStream('a'))
2645 with self.maybeRaises(TypeError):
2646 t.read(1)
2647 t = self.TextIOWrapper(NonbytesStream('a'))
2648 with self.maybeRaises(TypeError):
2649 t.readline()
2650 t = self.TextIOWrapper(NonbytesStream('a'))
2651 self.assertEqual(t.read(), u'a')
2652
2653 def test_illegal_decoder(self):
2654 # Issue #17106
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002655 # Bypass the early encoding check added in issue 20404
2656 def _make_illegal_wrapper():
2657 quopri = codecs.lookup("quopri_codec")
2658 quopri._is_text_encoding = True
2659 try:
2660 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
2661 newline='\n', encoding="quopri_codec")
2662 finally:
2663 quopri._is_text_encoding = False
2664 return t
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002665 # Crash when decoder returns non-string
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002666 with support.check_py3k_warnings():
2667 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2668 encoding='quopri_codec')
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002669 with self.maybeRaises(TypeError):
2670 t.read(1)
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002671 with support.check_py3k_warnings():
2672 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2673 encoding='quopri_codec')
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002674 with self.maybeRaises(TypeError):
2675 t.readline()
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002676 with support.check_py3k_warnings():
2677 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2678 encoding='quopri_codec')
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002679 with self.maybeRaises(TypeError):
2680 t.read()
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002681 #else:
2682 #t = _make_illegal_wrapper()
2683 #self.assertRaises(TypeError, t.read, 1)
2684 #t = _make_illegal_wrapper()
2685 #self.assertRaises(TypeError, t.readline)
2686 #t = _make_illegal_wrapper()
2687 #self.assertRaises(TypeError, t.read)
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002688
2689
Antoine Pitrou19690592009-06-12 20:14:08 +00002690class CTextIOWrapperTest(TextIOWrapperTest):
2691
2692 def test_initialization(self):
2693 r = self.BytesIO(b"\xc3\xa9\n\n")
2694 b = self.BufferedReader(r, 1000)
2695 t = self.TextIOWrapper(b)
2696 self.assertRaises(TypeError, t.__init__, b, newline=42)
2697 self.assertRaises(ValueError, t.read)
2698 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2699 self.assertRaises(ValueError, t.read)
2700
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002701 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2702 self.assertRaises(Exception, repr, t)
2703
Antoine Pitrou19690592009-06-12 20:14:08 +00002704 def test_garbage_collection(self):
2705 # C TextIOWrapper objects are collected, and collecting them flushes
2706 # all data to disk.
2707 # The Python version has __del__, so it ends in gc.garbage instead.
2708 rawio = io.FileIO(support.TESTFN, "wb")
2709 b = self.BufferedWriter(rawio)
2710 t = self.TextIOWrapper(b, encoding="ascii")
2711 t.write("456def")
2712 t.x = t
2713 wr = weakref.ref(t)
2714 del t
2715 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03002716 self.assertIsNone(wr(), wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002717 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002718 self.assertEqual(f.read(), b"456def")
2719
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002720 def test_rwpair_cleared_before_textio(self):
2721 # Issue 13070: TextIOWrapper's finalization would crash when called
2722 # after the reference to the underlying BufferedRWPair's writer got
2723 # cleared by the GC.
2724 for i in range(1000):
2725 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2726 t1 = self.TextIOWrapper(b1, encoding="ascii")
2727 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2728 t2 = self.TextIOWrapper(b2, encoding="ascii")
2729 # circular references
2730 t1.buddy = t2
2731 t2.buddy = t1
2732 support.gc_collect()
2733
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002734 maybeRaises = unittest.TestCase.assertRaises
2735
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002736
Antoine Pitrou19690592009-06-12 20:14:08 +00002737class PyTextIOWrapperTest(TextIOWrapperTest):
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002738 @contextlib.contextmanager
2739 def maybeRaises(self, *args, **kwds):
2740 yield
Antoine Pitrou19690592009-06-12 20:14:08 +00002741
2742
2743class IncrementalNewlineDecoderTest(unittest.TestCase):
2744
2745 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002746 # UTF-8 specific tests for a newline decoder
2747 def _check_decode(b, s, **kwargs):
2748 # We exercise getstate() / setstate() as well as decode()
2749 state = decoder.getstate()
Ezio Melotti2623a372010-11-21 13:34:58 +00002750 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002751 decoder.setstate(state)
Ezio Melotti2623a372010-11-21 13:34:58 +00002752 self.assertEqual(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002753
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002754 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002755
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002756 _check_decode(b'\xe8', "")
2757 _check_decode(b'\xa2', "")
2758 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002759
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002760 _check_decode(b'\xe8', "")
2761 _check_decode(b'\xa2', "")
2762 _check_decode(b'\x88', "\u8888")
2763
2764 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002765 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2766
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002767 decoder.reset()
2768 _check_decode(b'\n', "\n")
2769 _check_decode(b'\r', "")
2770 _check_decode(b'', "\n", final=True)
2771 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002772
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002773 _check_decode(b'\r', "")
2774 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002775
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002776 _check_decode(b'\r\r\n', "\n\n")
2777 _check_decode(b'\r', "")
2778 _check_decode(b'\r', "\n")
2779 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002780
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002781 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2782 _check_decode(b'\xe8\xa2\x88', "\u8888")
2783 _check_decode(b'\n', "\n")
2784 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2785 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002786
Antoine Pitrou19690592009-06-12 20:14:08 +00002787 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002788 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002789 if encoding is not None:
2790 encoder = codecs.getincrementalencoder(encoding)()
2791 def _decode_bytewise(s):
2792 # Decode one byte at a time
2793 for b in encoder.encode(s):
2794 result.append(decoder.decode(b))
2795 else:
2796 encoder = None
2797 def _decode_bytewise(s):
2798 # Decode one char at a time
2799 for c in s:
2800 result.append(decoder.decode(c))
Ezio Melotti2623a372010-11-21 13:34:58 +00002801 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002802 _decode_bytewise("abc\n\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002803 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002804 _decode_bytewise("\nabc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002805 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002806 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002807 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002808 _decode_bytewise("abc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002809 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002810 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002811 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002812 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002813 input = "abc"
2814 if encoder is not None:
2815 encoder.reset()
2816 input = encoder.encode(input)
Ezio Melotti2623a372010-11-21 13:34:58 +00002817 self.assertEqual(decoder.decode(input), "abc")
2818 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002819
2820 def test_newline_decoder(self):
2821 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002822 # None meaning the IncrementalNewlineDecoder takes unicode input
2823 # rather than bytes input
2824 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002825 'utf-16', 'utf-16-le', 'utf-16-be',
2826 'utf-32', 'utf-32-le', 'utf-32-be',
2827 )
2828 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002829 decoder = enc and codecs.getincrementaldecoder(enc)()
2830 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2831 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002832 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002833 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2834 self.check_newline_decoding_utf8(decoder)
2835
2836 def test_newline_bytes(self):
2837 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2838 def _check(dec):
Ezio Melotti2623a372010-11-21 13:34:58 +00002839 self.assertEqual(dec.newlines, None)
2840 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2841 self.assertEqual(dec.newlines, None)
2842 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2843 self.assertEqual(dec.newlines, None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002844 dec = self.IncrementalNewlineDecoder(None, translate=False)
2845 _check(dec)
2846 dec = self.IncrementalNewlineDecoder(None, translate=True)
2847 _check(dec)
2848
2849class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2850 pass
2851
2852class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2853 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002854
Christian Heimes1a6387e2008-03-26 12:49:49 +00002855
2856# XXX Tests for open()
2857
2858class MiscIOTest(unittest.TestCase):
2859
Benjamin Petersonad100c32008-11-20 22:06:22 +00002860 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002861 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002862
Antoine Pitrou19690592009-06-12 20:14:08 +00002863 def test___all__(self):
2864 for name in self.io.__all__:
2865 obj = getattr(self.io, name, None)
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03002866 self.assertIsNotNone(obj, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002867 if name == "open":
2868 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002869 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002870 self.assertTrue(issubclass(obj, Exception), name)
2871 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002872 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002873
Benjamin Petersonad100c32008-11-20 22:06:22 +00002874 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002875 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002876 self.assertEqual(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002877 f.close()
2878
Antoine Pitrou19690592009-06-12 20:14:08 +00002879 f = self.open(support.TESTFN, "U")
Ezio Melotti2623a372010-11-21 13:34:58 +00002880 self.assertEqual(f.name, support.TESTFN)
2881 self.assertEqual(f.buffer.name, support.TESTFN)
2882 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2883 self.assertEqual(f.mode, "U")
2884 self.assertEqual(f.buffer.mode, "rb")
2885 self.assertEqual(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002886 f.close()
2887
Antoine Pitrou19690592009-06-12 20:14:08 +00002888 f = self.open(support.TESTFN, "w+")
Ezio Melotti2623a372010-11-21 13:34:58 +00002889 self.assertEqual(f.mode, "w+")
2890 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2891 self.assertEqual(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002892
Antoine Pitrou19690592009-06-12 20:14:08 +00002893 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti2623a372010-11-21 13:34:58 +00002894 self.assertEqual(g.mode, "wb")
2895 self.assertEqual(g.raw.mode, "wb")
2896 self.assertEqual(g.name, f.fileno())
2897 self.assertEqual(g.raw.name, f.fileno())
Benjamin Petersonad100c32008-11-20 22:06:22 +00002898 f.close()
2899 g.close()
2900
Antoine Pitrou19690592009-06-12 20:14:08 +00002901 def test_io_after_close(self):
2902 for kwargs in [
2903 {"mode": "w"},
2904 {"mode": "wb"},
2905 {"mode": "w", "buffering": 1},
2906 {"mode": "w", "buffering": 2},
2907 {"mode": "wb", "buffering": 0},
2908 {"mode": "r"},
2909 {"mode": "rb"},
2910 {"mode": "r", "buffering": 1},
2911 {"mode": "r", "buffering": 2},
2912 {"mode": "rb", "buffering": 0},
2913 {"mode": "w+"},
2914 {"mode": "w+b"},
2915 {"mode": "w+", "buffering": 1},
2916 {"mode": "w+", "buffering": 2},
2917 {"mode": "w+b", "buffering": 0},
2918 ]:
2919 f = self.open(support.TESTFN, **kwargs)
2920 f.close()
2921 self.assertRaises(ValueError, f.flush)
2922 self.assertRaises(ValueError, f.fileno)
2923 self.assertRaises(ValueError, f.isatty)
2924 self.assertRaises(ValueError, f.__iter__)
2925 if hasattr(f, "peek"):
2926 self.assertRaises(ValueError, f.peek, 1)
2927 self.assertRaises(ValueError, f.read)
2928 if hasattr(f, "read1"):
2929 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinner5100a402011-05-25 22:15:36 +02002930 if hasattr(f, "readall"):
2931 self.assertRaises(ValueError, f.readall)
Antoine Pitrou19690592009-06-12 20:14:08 +00002932 if hasattr(f, "readinto"):
2933 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2934 self.assertRaises(ValueError, f.readline)
2935 self.assertRaises(ValueError, f.readlines)
2936 self.assertRaises(ValueError, f.seek, 0)
2937 self.assertRaises(ValueError, f.tell)
2938 self.assertRaises(ValueError, f.truncate)
2939 self.assertRaises(ValueError, f.write,
2940 b"" if "b" in kwargs['mode'] else "")
2941 self.assertRaises(ValueError, f.writelines, [])
2942 self.assertRaises(ValueError, next, f)
2943
2944 def test_blockingioerror(self):
2945 # Various BlockingIOError issues
2946 self.assertRaises(TypeError, self.BlockingIOError)
2947 self.assertRaises(TypeError, self.BlockingIOError, 1)
2948 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2949 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2950 b = self.BlockingIOError(1, "")
2951 self.assertEqual(b.characters_written, 0)
2952 class C(unicode):
2953 pass
2954 c = C("")
2955 b = self.BlockingIOError(1, c)
2956 c.b = b
2957 b.c = c
2958 wr = weakref.ref(c)
2959 del c, b
2960 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03002961 self.assertIsNone(wr(), wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002962
2963 def test_abcs(self):
2964 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002965 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2966 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2967 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2968 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002969
2970 def _check_abc_inheritance(self, abcmodule):
2971 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002972 self.assertIsInstance(f, abcmodule.IOBase)
2973 self.assertIsInstance(f, abcmodule.RawIOBase)
2974 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2975 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002976 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002977 self.assertIsInstance(f, abcmodule.IOBase)
2978 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2979 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2980 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002981 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002982 self.assertIsInstance(f, abcmodule.IOBase)
2983 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2984 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2985 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002986
2987 def test_abc_inheritance(self):
2988 # Test implementations inherit from their respective ABCs
2989 self._check_abc_inheritance(self)
2990
2991 def test_abc_inheritance_official(self):
2992 # Test implementations inherit from the official ABCs of the
2993 # baseline "io" module.
2994 self._check_abc_inheritance(io)
2995
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01002996 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2997 def test_nonblock_pipe_write_bigbuf(self):
2998 self._test_nonblock_pipe_write(16*1024)
2999
3000 @unittest.skipUnless(fcntl, 'fcntl required for this test')
3001 def test_nonblock_pipe_write_smallbuf(self):
3002 self._test_nonblock_pipe_write(1024)
3003
3004 def _set_non_blocking(self, fd):
3005 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
3006 self.assertNotEqual(flags, -1)
3007 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
3008 self.assertEqual(res, 0)
3009
3010 def _test_nonblock_pipe_write(self, bufsize):
3011 sent = []
3012 received = []
3013 r, w = os.pipe()
3014 self._set_non_blocking(r)
3015 self._set_non_blocking(w)
3016
3017 # To exercise all code paths in the C implementation we need
3018 # to play with buffer sizes. For instance, if we choose a
3019 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3020 # then we will never get a partial write of the buffer.
3021 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3022 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3023
3024 with rf, wf:
3025 for N in 9999, 73, 7574:
3026 try:
3027 i = 0
3028 while True:
3029 msg = bytes([i % 26 + 97]) * N
3030 sent.append(msg)
3031 wf.write(msg)
3032 i += 1
3033
3034 except self.BlockingIOError as e:
3035 self.assertEqual(e.args[0], errno.EAGAIN)
3036 sent[-1] = sent[-1][:e.characters_written]
3037 received.append(rf.read())
3038 msg = b'BLOCKED'
3039 wf.write(msg)
3040 sent.append(msg)
3041
3042 while True:
3043 try:
3044 wf.flush()
3045 break
3046 except self.BlockingIOError as e:
3047 self.assertEqual(e.args[0], errno.EAGAIN)
3048 self.assertEqual(e.characters_written, 0)
3049 received.append(rf.read())
3050
3051 received += iter(rf.read, None)
3052
3053 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03003054 self.assertEqual(sent, received)
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01003055 self.assertTrue(wf.closed)
3056 self.assertTrue(rf.closed)
3057
Antoine Pitrou19690592009-06-12 20:14:08 +00003058class CMiscIOTest(MiscIOTest):
3059 io = io
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03003060 shutdown_error = "RuntimeError: could not find io module state"
Antoine Pitrou19690592009-06-12 20:14:08 +00003061
3062class PyMiscIOTest(MiscIOTest):
3063 io = pyio
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03003064 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Petersonad100c32008-11-20 22:06:22 +00003065
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003066
3067@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3068class SignalsTest(unittest.TestCase):
3069
3070 def setUp(self):
3071 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3072
3073 def tearDown(self):
3074 signal.signal(signal.SIGALRM, self.oldalrm)
3075
3076 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00003077 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003078
3079 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinner49d495f2011-07-04 11:44:46 +02003080 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
3081 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003082 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3083 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou6439c002011-02-25 21:35:47 +00003084 invokes the signal handler, and bubbles up the exception raised
3085 in the latter."""
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003086 read_results = []
3087 def _read():
3088 s = os.read(r, 1)
3089 read_results.append(s)
3090 t = threading.Thread(target=_read)
3091 t.daemon = True
3092 r, w = os.pipe()
3093 try:
3094 wio = self.io.open(w, **fdopen_kwargs)
3095 t.start()
3096 signal.alarm(1)
3097 # Fill the pipe enough that the write will be blocking.
3098 # It will be interrupted by the timer armed above. Since the
3099 # other thread has read one byte, the low-level write will
3100 # return with a successful (partial) result rather than an EINTR.
3101 # The buffered IO layer must check for pending signal
3102 # handlers, which in this case will invoke alarm_interrupt().
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03003103 try:
3104 with self.assertRaises(ZeroDivisionError):
3105 wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1))
3106 finally:
3107 t.join()
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003108 # We got one byte, get another one and check that it isn't a
3109 # repeat of the first one.
3110 read_results.append(os.read(r, 1))
3111 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3112 finally:
3113 os.close(w)
3114 os.close(r)
3115 # This is deliberate. If we didn't close the file descriptor
3116 # before closing wio, wio would try to flush its internal
3117 # buffer, and block again.
3118 try:
3119 wio.close()
3120 except IOError as e:
3121 if e.errno != errno.EBADF:
3122 raise
3123
3124 def test_interrupted_write_unbuffered(self):
3125 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3126
3127 def test_interrupted_write_buffered(self):
3128 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3129
3130 def test_interrupted_write_text(self):
3131 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3132
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003133 def check_reentrant_write(self, data, **fdopen_kwargs):
3134 def on_alarm(*args):
3135 # Will be called reentrantly from the same thread
3136 wio.write(data)
Victor Stinner4c41f842011-07-05 13:29:26 +02003137 1//0
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003138 signal.signal(signal.SIGALRM, on_alarm)
3139 r, w = os.pipe()
3140 wio = self.io.open(w, **fdopen_kwargs)
3141 try:
3142 signal.alarm(1)
3143 # Either the reentrant call to wio.write() fails with RuntimeError,
3144 # or the signal handler raises ZeroDivisionError.
3145 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3146 while 1:
3147 for i in range(100):
3148 wio.write(data)
3149 wio.flush()
3150 # Make sure the buffer doesn't fill up and block further writes
3151 os.read(r, len(data) * 100)
3152 exc = cm.exception
3153 if isinstance(exc, RuntimeError):
3154 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3155 finally:
3156 wio.close()
3157 os.close(r)
3158
3159 def test_reentrant_write_buffered(self):
3160 self.check_reentrant_write(b"xy", mode="wb")
3161
3162 def test_reentrant_write_text(self):
3163 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3164
Antoine Pitrou6439c002011-02-25 21:35:47 +00003165 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3166 """Check that a buffered read, when it gets interrupted (either
3167 returning a partial result or EINTR), properly invokes the signal
3168 handler and retries if the latter returned successfully."""
3169 r, w = os.pipe()
3170 fdopen_kwargs["closefd"] = False
3171 def alarm_handler(sig, frame):
3172 os.write(w, b"bar")
3173 signal.signal(signal.SIGALRM, alarm_handler)
3174 try:
3175 rio = self.io.open(r, **fdopen_kwargs)
3176 os.write(w, b"foo")
3177 signal.alarm(1)
3178 # Expected behaviour:
3179 # - first raw read() returns partial b"foo"
3180 # - second raw read() returns EINTR
3181 # - third raw read() returns b"bar"
3182 self.assertEqual(decode(rio.read(6)), "foobar")
3183 finally:
3184 rio.close()
3185 os.close(w)
3186 os.close(r)
3187
3188 def test_interrupterd_read_retry_buffered(self):
3189 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3190 mode="rb")
3191
3192 def test_interrupterd_read_retry_text(self):
3193 self.check_interrupted_read_retry(lambda x: x,
3194 mode="r")
3195
3196 @unittest.skipUnless(threading, 'Threading required for this test.')
3197 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3198 """Check that a buffered write, when it gets interrupted (either
3199 returning a partial result or EINTR), properly invokes the signal
3200 handler and retries if the latter returned successfully."""
3201 select = support.import_module("select")
3202 # A quantity that exceeds the buffer size of an anonymous pipe's
3203 # write end.
Antoine Pitrou68915d72013-04-24 23:31:38 +02003204 N = support.PIPE_MAX_SIZE
Antoine Pitrou6439c002011-02-25 21:35:47 +00003205 r, w = os.pipe()
3206 fdopen_kwargs["closefd"] = False
3207 # We need a separate thread to read from the pipe and allow the
3208 # write() to finish. This thread is started after the SIGALRM is
3209 # received (forcing a first EINTR in write()).
3210 read_results = []
3211 write_finished = False
Serhiy Storchaka53ea1622015-03-28 20:38:48 +02003212 error = [None]
Antoine Pitrou6439c002011-02-25 21:35:47 +00003213 def _read():
Serhiy Storchaka53ea1622015-03-28 20:38:48 +02003214 try:
3215 while not write_finished:
3216 while r in select.select([r], [], [], 1.0)[0]:
3217 s = os.read(r, 1024)
3218 read_results.append(s)
3219 except BaseException as exc:
3220 error[0] = exc
Antoine Pitrou6439c002011-02-25 21:35:47 +00003221 t = threading.Thread(target=_read)
3222 t.daemon = True
3223 def alarm1(sig, frame):
3224 signal.signal(signal.SIGALRM, alarm2)
3225 signal.alarm(1)
3226 def alarm2(sig, frame):
3227 t.start()
3228 signal.signal(signal.SIGALRM, alarm1)
3229 try:
3230 wio = self.io.open(w, **fdopen_kwargs)
3231 signal.alarm(1)
3232 # Expected behaviour:
3233 # - first raw write() is partial (because of the limited pipe buffer
3234 # and the first alarm)
3235 # - second raw write() returns EINTR (because of the second alarm)
3236 # - subsequent write()s are successful (either partial or complete)
3237 self.assertEqual(N, wio.write(item * N))
3238 wio.flush()
3239 write_finished = True
3240 t.join()
Serhiy Storchaka53ea1622015-03-28 20:38:48 +02003241
3242 self.assertIsNone(error[0])
Antoine Pitrou6439c002011-02-25 21:35:47 +00003243 self.assertEqual(N, sum(len(x) for x in read_results))
3244 finally:
3245 write_finished = True
3246 os.close(w)
3247 os.close(r)
3248 # This is deliberate. If we didn't close the file descriptor
3249 # before closing wio, wio would try to flush its internal
3250 # buffer, and could block (in case of failure).
3251 try:
3252 wio.close()
3253 except IOError as e:
3254 if e.errno != errno.EBADF:
3255 raise
3256
3257 def test_interrupterd_write_retry_buffered(self):
3258 self.check_interrupted_write_retry(b"x", mode="wb")
3259
3260 def test_interrupterd_write_retry_text(self):
3261 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3262
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003263
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003264class CSignalsTest(SignalsTest):
3265 io = io
3266
3267class PySignalsTest(SignalsTest):
3268 io = pyio
3269
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003270 # Handling reentrancy issues would slow down _pyio even more, so the
3271 # tests are disabled.
3272 test_reentrant_write_buffered = None
3273 test_reentrant_write_text = None
3274
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003275
Christian Heimes1a6387e2008-03-26 12:49:49 +00003276def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00003277 tests = (CIOTest, PyIOTest,
3278 CBufferedReaderTest, PyBufferedReaderTest,
3279 CBufferedWriterTest, PyBufferedWriterTest,
3280 CBufferedRWPairTest, PyBufferedRWPairTest,
3281 CBufferedRandomTest, PyBufferedRandomTest,
3282 StatefulIncrementalDecoderTest,
3283 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3284 CTextIOWrapperTest, PyTextIOWrapperTest,
3285 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003286 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00003287 )
3288
3289 # Put the namespaces of the IO module we are testing and some useful mock
3290 # classes in the __dict__ of each test.
3291 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00003292 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00003293 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3294 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
3295 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
3296 globs = globals()
3297 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3298 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3299 # Avoid turning open into a bound method.
3300 py_io_ns["open"] = pyio.OpenWrapper
3301 for test in tests:
3302 if test.__name__.startswith("C"):
3303 for name, obj in c_io_ns.items():
3304 setattr(test, name, obj)
3305 elif test.__name__.startswith("Py"):
3306 for name, obj in py_io_ns.items():
3307 setattr(test, name, obj)
3308
3309 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00003310
3311if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00003312 test_main()