blob: e26ffba22966cdd81e81c2a4ddd642e59577c47a [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakac72e66a2015-11-02 15:06:09 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Antoine Pitrou19690592009-06-12 20:14:08 +000019# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Serhiy Storchaka05b0a1b2014-06-09 13:32:08 +030032import warnings
Antoine Pitrou19690592009-06-12 20:14:08 +000033import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000034import signal
35import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000036from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000037from collections import deque
Antoine Pitrou78e761e2012-10-16 22:57:11 +020038from UserList import UserList
Antoine Pitrou19690592009-06-12 20:14:08 +000039from test import test_support as support
Serhiy Storchaka354d50e2013-02-03 17:10:42 +020040import contextlib
Christian Heimes1a6387e2008-03-26 12:49:49 +000041
42import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000043import io # C implementation of io
44import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000045try:
46 import threading
47except ImportError:
48 threading = None
Antoine Pitrou5aa7df32011-11-21 20:16:44 +010049try:
50 import fcntl
51except ImportError:
52 fcntl = None
Antoine Pitrou19690592009-06-12 20:14:08 +000053
54__metaclass__ = type
55bytes = support.py3k_bytes
56
Martin Panterc9813d82016-06-03 05:59:20 +000057def byteslike(*pos, **kw):
58 return memoryview(bytearray(*pos, **kw))
59
Antoine Pitrou19690592009-06-12 20:14:08 +000060def _default_chunk_size():
61 """Get the default TextIOWrapper chunk size"""
62 with io.open(__file__, "r", encoding="latin1") as f:
63 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000064
65
Antoine Pitrou6391b342010-09-14 18:48:19 +000066class MockRawIOWithoutRead:
67 """A RawIO implementation without read(), so as to exercise the default
68 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000069
70 def __init__(self, read_stack=()):
71 self._read_stack = list(read_stack)
72 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000073 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000074 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000075
Christian Heimes1a6387e2008-03-26 12:49:49 +000076 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000077 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000078 return len(b)
79
80 def writable(self):
81 return True
82
83 def fileno(self):
84 return 42
85
86 def readable(self):
87 return True
88
89 def seekable(self):
90 return True
91
92 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000093 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000094
95 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000096 return 0 # same comment as above
97
98 def readinto(self, buf):
99 self._reads += 1
100 max_len = len(buf)
101 try:
102 data = self._read_stack[0]
103 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000104 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +0000105 return 0
106 if data is None:
107 del self._read_stack[0]
108 return None
109 n = len(data)
110 if len(data) <= max_len:
111 del self._read_stack[0]
112 buf[:n] = data
113 return n
114 else:
115 buf[:] = data[:max_len]
116 self._read_stack[0] = data[max_len:]
117 return max_len
118
119 def truncate(self, pos=None):
120 return pos
121
Antoine Pitrou6391b342010-09-14 18:48:19 +0000122class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
123 pass
124
125class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
126 pass
127
128
129class MockRawIO(MockRawIOWithoutRead):
130
131 def read(self, n=None):
132 self._reads += 1
133 try:
134 return self._read_stack.pop(0)
135 except:
136 self._extraneous_reads += 1
137 return b""
138
Antoine Pitrou19690592009-06-12 20:14:08 +0000139class CMockRawIO(MockRawIO, io.RawIOBase):
140 pass
141
142class PyMockRawIO(MockRawIO, pyio.RawIOBase):
143 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000144
145
Antoine Pitrou19690592009-06-12 20:14:08 +0000146class MisbehavedRawIO(MockRawIO):
147 def write(self, b):
148 return MockRawIO.write(self, b) * 2
149
150 def read(self, n=None):
151 return MockRawIO.read(self, n) * 2
152
153 def seek(self, pos, whence):
154 return -123
155
156 def tell(self):
157 return -456
158
159 def readinto(self, buf):
160 MockRawIO.readinto(self, buf)
161 return len(buf) * 5
162
163class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
164 pass
165
166class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
167 pass
168
169
170class CloseFailureIO(MockRawIO):
171 closed = 0
172
173 def close(self):
174 if not self.closed:
175 self.closed = 1
176 raise IOError
177
178class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
179 pass
180
181class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
182 pass
183
184
185class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000186
187 def __init__(self, data):
188 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000189 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000190
191 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000192 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000193 self.read_history.append(None if res is None else len(res))
194 return res
195
Antoine Pitrou19690592009-06-12 20:14:08 +0000196 def readinto(self, b):
197 res = super(MockFileIO, self).readinto(b)
198 self.read_history.append(res)
199 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000200
Antoine Pitrou19690592009-06-12 20:14:08 +0000201class CMockFileIO(MockFileIO, io.BytesIO):
202 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000203
Antoine Pitrou19690592009-06-12 20:14:08 +0000204class PyMockFileIO(MockFileIO, pyio.BytesIO):
205 pass
206
207
208class MockNonBlockWriterIO:
209
210 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000211 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000212 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000213
Antoine Pitrou19690592009-06-12 20:14:08 +0000214 def pop_written(self):
215 s = b"".join(self._write_stack)
216 self._write_stack[:] = []
217 return s
218
219 def block_on(self, char):
220 """Block when a given char is encountered."""
221 self._blocker_char = char
222
223 def readable(self):
224 return True
225
226 def seekable(self):
227 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000228
229 def writable(self):
230 return True
231
Antoine Pitrou19690592009-06-12 20:14:08 +0000232 def write(self, b):
233 b = bytes(b)
234 n = -1
235 if self._blocker_char:
236 try:
237 n = b.index(self._blocker_char)
238 except ValueError:
239 pass
240 else:
Antoine Pitrou5aa7df32011-11-21 20:16:44 +0100241 if n > 0:
242 # write data up to the first blocker
243 self._write_stack.append(b[:n])
244 return n
245 else:
246 # cancel blocker and indicate would block
247 self._blocker_char = None
248 return None
Antoine Pitrou19690592009-06-12 20:14:08 +0000249 self._write_stack.append(b)
250 return len(b)
251
252class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
253 BlockingIOError = io.BlockingIOError
254
255class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
256 BlockingIOError = pyio.BlockingIOError
257
Christian Heimes1a6387e2008-03-26 12:49:49 +0000258
259class IOTest(unittest.TestCase):
260
Antoine Pitrou19690592009-06-12 20:14:08 +0000261 def setUp(self):
262 support.unlink(support.TESTFN)
263
Christian Heimes1a6387e2008-03-26 12:49:49 +0000264 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000265 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000266
267 def write_ops(self, f):
268 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000269 f.truncate(0)
270 self.assertEqual(f.tell(), 5)
271 f.seek(0)
272
273 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000274 self.assertEqual(f.seek(0), 0)
275 self.assertEqual(f.write(b"Hello."), 6)
276 self.assertEqual(f.tell(), 6)
277 self.assertEqual(f.seek(-1, 1), 5)
278 self.assertEqual(f.tell(), 5)
Martin Panterc9813d82016-06-03 05:59:20 +0000279 buffer = bytearray(b" world\n\n\n")
280 self.assertEqual(f.write(buffer), 9)
281 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Christian Heimes1a6387e2008-03-26 12:49:49 +0000282 self.assertEqual(f.seek(0), 0)
283 self.assertEqual(f.write(b"h"), 1)
284 self.assertEqual(f.seek(-1, 2), 13)
285 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000286
Christian Heimes1a6387e2008-03-26 12:49:49 +0000287 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000288 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000289 self.assertRaises(TypeError, f.seek, 0.0)
290
291 def read_ops(self, f, buffered=False):
292 data = f.read(5)
293 self.assertEqual(data, b"hello")
Martin Panterc9813d82016-06-03 05:59:20 +0000294 data = byteslike(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000295 self.assertEqual(f.readinto(data), 5)
Martin Panterc9813d82016-06-03 05:59:20 +0000296 self.assertEqual(data.tobytes(), b" worl")
297 data = bytearray(5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000298 self.assertEqual(f.readinto(data), 2)
299 self.assertEqual(len(data), 5)
300 self.assertEqual(data[:2], b"d\n")
301 self.assertEqual(f.seek(0), 0)
302 self.assertEqual(f.read(20), b"hello world\n")
303 self.assertEqual(f.read(1), b"")
Martin Panterc9813d82016-06-03 05:59:20 +0000304 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000305 self.assertEqual(f.seek(-6, 2), 6)
306 self.assertEqual(f.read(5), b"world")
307 self.assertEqual(f.read(0), b"")
Martin Panterc9813d82016-06-03 05:59:20 +0000308 self.assertEqual(f.readinto(byteslike()), 0)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000309 self.assertEqual(f.seek(-6, 1), 5)
310 self.assertEqual(f.read(5), b" worl")
311 self.assertEqual(f.tell(), 10)
312 self.assertRaises(TypeError, f.seek, 0.0)
313 if buffered:
314 f.seek(0)
315 self.assertEqual(f.read(), b"hello world\n")
316 f.seek(6)
317 self.assertEqual(f.read(), b"world\n")
318 self.assertEqual(f.read(), b"")
319
320 LARGE = 2**31
321
322 def large_file_ops(self, f):
323 assert f.readable()
324 assert f.writable()
325 self.assertEqual(f.seek(self.LARGE), self.LARGE)
326 self.assertEqual(f.tell(), self.LARGE)
327 self.assertEqual(f.write(b"xxx"), 3)
328 self.assertEqual(f.tell(), self.LARGE + 3)
329 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
330 self.assertEqual(f.truncate(), self.LARGE + 2)
331 self.assertEqual(f.tell(), self.LARGE + 2)
332 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
333 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000334 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000335 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
336 self.assertEqual(f.seek(-1, 2), self.LARGE)
337 self.assertEqual(f.read(2), b"x")
338
Antoine Pitrou19690592009-06-12 20:14:08 +0000339 def test_invalid_operations(self):
340 # Try writing on a file opened in read mode and vice-versa.
341 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000342 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000343 self.assertRaises(IOError, fp.read)
344 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000345 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000346 self.assertRaises(IOError, fp.write, b"blah")
347 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000348 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000349 self.assertRaises(IOError, fp.write, "blah")
350 self.assertRaises(IOError, fp.writelines, ["blah\n"])
351
Christian Heimes1a6387e2008-03-26 12:49:49 +0000352 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000353 with self.open(support.TESTFN, "wb", buffering=0) as f:
354 self.assertEqual(f.readable(), False)
355 self.assertEqual(f.writable(), True)
356 self.assertEqual(f.seekable(), True)
357 self.write_ops(f)
358 with self.open(support.TESTFN, "rb", buffering=0) as f:
359 self.assertEqual(f.readable(), True)
360 self.assertEqual(f.writable(), False)
361 self.assertEqual(f.seekable(), True)
362 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000363
364 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000365 with self.open(support.TESTFN, "wb") as f:
366 self.assertEqual(f.readable(), False)
367 self.assertEqual(f.writable(), True)
368 self.assertEqual(f.seekable(), True)
369 self.write_ops(f)
370 with self.open(support.TESTFN, "rb") as f:
371 self.assertEqual(f.readable(), True)
372 self.assertEqual(f.writable(), False)
373 self.assertEqual(f.seekable(), True)
374 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000375
376 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000377 with self.open(support.TESTFN, "wb") as f:
378 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
379 with self.open(support.TESTFN, "rb") as f:
380 self.assertEqual(f.readline(), b"abc\n")
381 self.assertEqual(f.readline(10), b"def\n")
382 self.assertEqual(f.readline(2), b"xy")
383 self.assertEqual(f.readline(4), b"zzy\n")
384 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000385 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000386 self.assertRaises(TypeError, f.readline, 5.3)
387 with self.open(support.TESTFN, "r") as f:
388 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000389
390 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000391 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000392 self.write_ops(f)
393 data = f.getvalue()
394 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000395 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000396 self.read_ops(f, True)
397
398 def test_large_file_ops(self):
399 # On Windows and Mac OSX this test comsumes large resources; It takes
400 # a long time to build the >2GB file and takes >2GB of disk space
401 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000402 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware1f702212013-12-10 14:09:20 -0600403 support.requires(
404 'largefile',
405 'test requires %s bytes and a long time to run' % self.LARGE)
Antoine Pitrou19690592009-06-12 20:14:08 +0000406 with self.open(support.TESTFN, "w+b", 0) as f:
407 self.large_file_ops(f)
408 with self.open(support.TESTFN, "w+b") as f:
409 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000410
411 def test_with_open(self):
412 for bufsize in (0, 1, 100):
413 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000414 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000415 f.write(b"xxx")
416 self.assertEqual(f.closed, True)
417 f = None
418 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000419 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000420 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000421 except ZeroDivisionError:
422 self.assertEqual(f.closed, True)
423 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000424 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000425
Antoine Pitroue741cc62009-01-21 00:45:36 +0000426 # issue 5008
427 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000428 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000429 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000430 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000431 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000432 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000433 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000434 with self.open(support.TESTFN, "a") as f:
Serhiy Storchakaea4d2872015-08-02 15:19:04 +0300435 self.assertGreater(f.tell(), 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000436
Christian Heimes1a6387e2008-03-26 12:49:49 +0000437 def test_destructor(self):
438 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000439 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000440 def __del__(self):
441 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000442 try:
443 f = super(MyFileIO, self).__del__
444 except AttributeError:
445 pass
446 else:
447 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000448 def close(self):
449 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000450 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000451 def flush(self):
452 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000453 super(MyFileIO, self).flush()
454 f = MyFileIO(support.TESTFN, "wb")
455 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000456 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000457 support.gc_collect()
458 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000459 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000460 self.assertEqual(f.read(), b"xxx")
461
462 def _check_base_destructor(self, base):
463 record = []
464 class MyIO(base):
465 def __init__(self):
466 # This exercises the availability of attributes on object
467 # destruction.
468 # (in the C version, close() is called by the tp_dealloc
469 # function, not by __del__)
470 self.on_del = 1
471 self.on_close = 2
472 self.on_flush = 3
473 def __del__(self):
474 record.append(self.on_del)
475 try:
476 f = super(MyIO, self).__del__
477 except AttributeError:
478 pass
479 else:
480 f()
481 def close(self):
482 record.append(self.on_close)
483 super(MyIO, self).close()
484 def flush(self):
485 record.append(self.on_flush)
486 super(MyIO, self).flush()
487 f = MyIO()
488 del f
489 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000490 self.assertEqual(record, [1, 2, 3])
491
Antoine Pitrou19690592009-06-12 20:14:08 +0000492 def test_IOBase_destructor(self):
493 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000494
Antoine Pitrou19690592009-06-12 20:14:08 +0000495 def test_RawIOBase_destructor(self):
496 self._check_base_destructor(self.RawIOBase)
497
498 def test_BufferedIOBase_destructor(self):
499 self._check_base_destructor(self.BufferedIOBase)
500
501 def test_TextIOBase_destructor(self):
502 self._check_base_destructor(self.TextIOBase)
503
504 def test_close_flushes(self):
505 with self.open(support.TESTFN, "wb") as f:
506 f.write(b"xxx")
507 with self.open(support.TESTFN, "rb") as f:
508 self.assertEqual(f.read(), b"xxx")
509
510 def test_array_writes(self):
511 a = array.array(b'i', range(10))
512 n = len(a.tostring())
513 with self.open(support.TESTFN, "wb", 0) as f:
514 self.assertEqual(f.write(a), n)
515 with self.open(support.TESTFN, "wb") as f:
516 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000517
518 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000519 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000520 closefd=False)
521
Antoine Pitrou19690592009-06-12 20:14:08 +0000522 def test_read_closed(self):
523 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000524 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000525 with self.open(support.TESTFN, "r") as f:
526 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000527 self.assertEqual(file.read(), "egg\n")
528 file.seek(0)
529 file.close()
530 self.assertRaises(ValueError, file.read)
531
532 def test_no_closefd_with_filename(self):
533 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000534 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000535
536 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000537 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000538 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000539 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000540 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000541 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000542 self.assertEqual(file.buffer.raw.closefd, False)
543
Antoine Pitrou19690592009-06-12 20:14:08 +0000544 def test_garbage_collection(self):
545 # FileIO objects are collected, and collecting them flushes
546 # all data to disk.
547 f = self.FileIO(support.TESTFN, "wb")
548 f.write(b"abcxxx")
549 f.f = f
550 wr = weakref.ref(f)
551 del f
552 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +0300553 self.assertIsNone(wr(), wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000554 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000555 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000556
Antoine Pitrou19690592009-06-12 20:14:08 +0000557 def test_unbounded_file(self):
558 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
559 zero = "/dev/zero"
560 if not os.path.exists(zero):
561 self.skipTest("{0} does not exist".format(zero))
562 if sys.maxsize > 0x7FFFFFFF:
563 self.skipTest("test can only run in a 32-bit address space")
564 if support.real_max_memuse < support._2G:
565 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000566 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000567 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000568 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000569 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000570 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000571 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000572
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200573 def check_flush_error_on_close(self, *args, **kwargs):
574 # Test that the file is closed despite failed flush
575 # and that flush() is called before file closed.
576 f = self.open(*args, **kwargs)
577 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000578 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200579 closed[:] = [f.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000580 raise IOError()
581 f.flush = bad_flush
582 self.assertRaises(IOError, f.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600583 self.assertTrue(f.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200584 self.assertTrue(closed) # flush() called
585 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchaka437d5352015-02-23 00:28:38 +0200586 f.flush = lambda: None # break reference loop
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200587
588 def test_flush_error_on_close(self):
589 # raw file
590 # Issue #5700: io.FileIO calls flush() after file closed
591 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
592 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
593 self.check_flush_error_on_close(fd, 'wb', buffering=0)
594 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
595 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
596 os.close(fd)
597 # buffered io
598 self.check_flush_error_on_close(support.TESTFN, 'wb')
599 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
600 self.check_flush_error_on_close(fd, 'wb')
601 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
602 self.check_flush_error_on_close(fd, 'wb', closefd=False)
603 os.close(fd)
604 # text io
605 self.check_flush_error_on_close(support.TESTFN, 'w')
606 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
607 self.check_flush_error_on_close(fd, 'w')
608 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
609 self.check_flush_error_on_close(fd, 'w', closefd=False)
610 os.close(fd)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000611
612 def test_multi_close(self):
613 f = self.open(support.TESTFN, "wb", buffering=0)
614 f.close()
615 f.close()
616 f.close()
617 self.assertRaises(ValueError, f.flush)
618
Antoine Pitrou6391b342010-09-14 18:48:19 +0000619 def test_RawIOBase_read(self):
620 # Exercise the default RawIOBase.read() implementation (which calls
621 # readinto() internally).
622 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
623 self.assertEqual(rawio.read(2), b"ab")
624 self.assertEqual(rawio.read(2), b"c")
625 self.assertEqual(rawio.read(2), b"d")
626 self.assertEqual(rawio.read(2), None)
627 self.assertEqual(rawio.read(2), b"ef")
628 self.assertEqual(rawio.read(2), b"g")
629 self.assertEqual(rawio.read(2), None)
630 self.assertEqual(rawio.read(2), b"")
631
Hynek Schlawack877effc2012-05-25 09:24:18 +0200632 def test_fileio_closefd(self):
633 # Issue #4841
634 with self.open(__file__, 'rb') as f1, \
635 self.open(__file__, 'rb') as f2:
636 fileio = self.FileIO(f1.fileno(), closefd=False)
637 # .__init__() must not close f1
638 fileio.__init__(f2.fileno(), closefd=False)
639 f1.readline()
640 # .close() must not close f2
641 fileio.close()
642 f2.readline()
643
Serhiy Storchaka05b0a1b2014-06-09 13:32:08 +0300644 def test_nonbuffered_textio(self):
645 with warnings.catch_warnings(record=True) as recorded:
646 with self.assertRaises(ValueError):
647 self.open(support.TESTFN, 'w', buffering=0)
648 support.gc_collect()
649 self.assertEqual(recorded, [])
650
651 def test_invalid_newline(self):
652 with warnings.catch_warnings(record=True) as recorded:
653 with self.assertRaises(ValueError):
654 self.open(support.TESTFN, 'w', newline='invalid')
655 support.gc_collect()
656 self.assertEqual(recorded, [])
657
Martin Panterc9813d82016-06-03 05:59:20 +0000658 def test_buffered_readinto_mixin(self):
659 # Test the implementation provided by BufferedIOBase
660 class Stream(self.BufferedIOBase):
661 def read(self, size):
662 return b"12345"
663 stream = Stream()
664 buffer = byteslike(5)
665 self.assertEqual(stream.readinto(buffer), 5)
666 self.assertEqual(buffer.tobytes(), b"12345")
667
Hynek Schlawack877effc2012-05-25 09:24:18 +0200668
Antoine Pitrou19690592009-06-12 20:14:08 +0000669class CIOTest(IOTest):
Antoine Pitrou16166452011-07-12 22:04:20 +0200670
671 def test_IOBase_finalize(self):
672 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
673 # class which inherits IOBase and an object of this class are caught
674 # in a reference cycle and close() is already in the method cache.
675 class MyIO(self.IOBase):
676 def close(self):
677 pass
678
679 # create an instance to populate the method cache
680 MyIO()
681 obj = MyIO()
682 obj.obj = obj
683 wr = weakref.ref(obj)
684 del MyIO
685 del obj
686 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +0300687 self.assertIsNone(wr(), wr)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000688
Antoine Pitrou19690592009-06-12 20:14:08 +0000689class PyIOTest(IOTest):
690 test_array_writes = unittest.skip(
691 "len(array.array) returns number of elements rather than bytelength"
692 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000693
694
Antoine Pitrou19690592009-06-12 20:14:08 +0000695class CommonBufferedTests:
696 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
697
698 def test_detach(self):
699 raw = self.MockRawIO()
700 buf = self.tp(raw)
701 self.assertIs(buf.detach(), raw)
702 self.assertRaises(ValueError, buf.detach)
703
Benjamin Peterson53ae6142014-12-21 20:51:50 -0600704 repr(buf) # Should still work
705
Antoine Pitrou19690592009-06-12 20:14:08 +0000706 def test_fileno(self):
707 rawio = self.MockRawIO()
708 bufio = self.tp(rawio)
709
Ezio Melotti2623a372010-11-21 13:34:58 +0000710 self.assertEqual(42, bufio.fileno())
Antoine Pitrou19690592009-06-12 20:14:08 +0000711
Antoine Pitrou19690592009-06-12 20:14:08 +0000712 def test_invalid_args(self):
713 rawio = self.MockRawIO()
714 bufio = self.tp(rawio)
715 # Invalid whence
716 self.assertRaises(ValueError, bufio.seek, 0, -1)
717 self.assertRaises(ValueError, bufio.seek, 0, 3)
718
719 def test_override_destructor(self):
720 tp = self.tp
721 record = []
722 class MyBufferedIO(tp):
723 def __del__(self):
724 record.append(1)
725 try:
726 f = super(MyBufferedIO, self).__del__
727 except AttributeError:
728 pass
729 else:
730 f()
731 def close(self):
732 record.append(2)
733 super(MyBufferedIO, self).close()
734 def flush(self):
735 record.append(3)
736 super(MyBufferedIO, self).flush()
737 rawio = self.MockRawIO()
738 bufio = MyBufferedIO(rawio)
739 writable = bufio.writable()
740 del bufio
741 support.gc_collect()
742 if writable:
743 self.assertEqual(record, [1, 2, 3])
744 else:
745 self.assertEqual(record, [1, 2])
746
747 def test_context_manager(self):
748 # Test usability as a context manager
749 rawio = self.MockRawIO()
750 bufio = self.tp(rawio)
751 def _with():
752 with bufio:
753 pass
754 _with()
755 # bufio should now be closed, and using it a second time should raise
756 # a ValueError.
757 self.assertRaises(ValueError, _with)
758
759 def test_error_through_destructor(self):
760 # Test that the exception state is not modified by a destructor,
761 # even if close() fails.
762 rawio = self.CloseFailureIO()
763 def f():
764 self.tp(rawio).xyzzy
765 with support.captured_output("stderr") as s:
766 self.assertRaises(AttributeError, f)
767 s = s.getvalue().strip()
768 if s:
769 # The destructor *may* have printed an unraisable error, check it
770 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000771 self.assertTrue(s.startswith("Exception IOError: "), s)
772 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000773
774 def test_repr(self):
775 raw = self.MockRawIO()
776 b = self.tp(raw)
777 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
778 self.assertEqual(repr(b), "<%s>" % clsname)
779 raw.name = "dummy"
780 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
781 raw.name = b"dummy"
782 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000783
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000784 def test_flush_error_on_close(self):
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200785 # Test that buffered file is closed despite failed flush
786 # and that flush() is called before file closed.
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000787 raw = self.MockRawIO()
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200788 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000789 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200790 closed[:] = [b.closed, raw.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000791 raise IOError()
792 raw.flush = bad_flush
793 b = self.tp(raw)
794 self.assertRaises(IOError, b.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600795 self.assertTrue(b.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200796 self.assertTrue(raw.closed)
797 self.assertTrue(closed) # flush() called
798 self.assertFalse(closed[0]) # flush() called before file closed
799 self.assertFalse(closed[1])
Serhiy Storchaka437d5352015-02-23 00:28:38 +0200800 raw.flush = lambda: None # break reference loop
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600801
802 def test_close_error_on_close(self):
803 raw = self.MockRawIO()
804 def bad_flush():
805 raise IOError('flush')
806 def bad_close():
807 raise IOError('close')
808 raw.close = bad_close
809 b = self.tp(raw)
810 b.flush = bad_flush
811 with self.assertRaises(IOError) as err: # exception not swallowed
812 b.close()
813 self.assertEqual(err.exception.args, ('close',))
814 self.assertFalse(b.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000815
816 def test_multi_close(self):
817 raw = self.MockRawIO()
818 b = self.tp(raw)
819 b.close()
820 b.close()
821 b.close()
822 self.assertRaises(ValueError, b.flush)
823
Antoine Pitroufc9ead62010-12-21 21:26:55 +0000824 def test_readonly_attributes(self):
825 raw = self.MockRawIO()
826 buf = self.tp(raw)
827 x = self.MockRawIO()
828 with self.assertRaises((AttributeError, TypeError)):
829 buf.raw = x
830
Christian Heimes1a6387e2008-03-26 12:49:49 +0000831
Antoine Pitroubff5df02012-07-29 19:02:46 +0200832class SizeofTest:
833
834 @support.cpython_only
835 def test_sizeof(self):
836 bufsize1 = 4096
837 bufsize2 = 8192
838 rawio = self.MockRawIO()
839 bufio = self.tp(rawio, buffer_size=bufsize1)
840 size = sys.getsizeof(bufio) - bufsize1
841 rawio = self.MockRawIO()
842 bufio = self.tp(rawio, buffer_size=bufsize2)
843 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
844
845
Antoine Pitrou19690592009-06-12 20:14:08 +0000846class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
847 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000848
Antoine Pitrou19690592009-06-12 20:14:08 +0000849 def test_constructor(self):
850 rawio = self.MockRawIO([b"abc"])
851 bufio = self.tp(rawio)
852 bufio.__init__(rawio)
853 bufio.__init__(rawio, buffer_size=1024)
854 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000855 self.assertEqual(b"abc", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000856 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
857 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
858 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
859 rawio = self.MockRawIO([b"abc"])
860 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000861 self.assertEqual(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000862
Serhiy Storchaka1d19f972014-02-12 10:52:07 +0200863 def test_uninitialized(self):
864 bufio = self.tp.__new__(self.tp)
865 del bufio
866 bufio = self.tp.__new__(self.tp)
867 self.assertRaisesRegexp((ValueError, AttributeError),
868 'uninitialized|has no attribute',
869 bufio.read, 0)
870 bufio.__init__(self.MockRawIO())
871 self.assertEqual(bufio.read(0), b'')
872
Antoine Pitrou19690592009-06-12 20:14:08 +0000873 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000874 for arg in (None, 7):
875 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
876 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000877 self.assertEqual(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000878 # Invalid args
879 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000880
Antoine Pitrou19690592009-06-12 20:14:08 +0000881 def test_read1(self):
882 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
883 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000884 self.assertEqual(b"a", bufio.read(1))
885 self.assertEqual(b"b", bufio.read1(1))
886 self.assertEqual(rawio._reads, 1)
887 self.assertEqual(b"c", bufio.read1(100))
888 self.assertEqual(rawio._reads, 1)
889 self.assertEqual(b"d", bufio.read1(100))
890 self.assertEqual(rawio._reads, 2)
891 self.assertEqual(b"efg", bufio.read1(100))
892 self.assertEqual(rawio._reads, 3)
893 self.assertEqual(b"", bufio.read1(100))
894 self.assertEqual(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000895 # Invalid args
896 self.assertRaises(ValueError, bufio.read1, -1)
897
898 def test_readinto(self):
899 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
900 bufio = self.tp(rawio)
901 b = bytearray(2)
Ezio Melotti2623a372010-11-21 13:34:58 +0000902 self.assertEqual(bufio.readinto(b), 2)
903 self.assertEqual(b, b"ab")
904 self.assertEqual(bufio.readinto(b), 2)
905 self.assertEqual(b, b"cd")
906 self.assertEqual(bufio.readinto(b), 2)
907 self.assertEqual(b, b"ef")
908 self.assertEqual(bufio.readinto(b), 1)
909 self.assertEqual(b, b"gf")
910 self.assertEqual(bufio.readinto(b), 0)
911 self.assertEqual(b, b"gf")
Antoine Pitrou19690592009-06-12 20:14:08 +0000912
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000913 def test_readlines(self):
914 def bufio():
915 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
916 return self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000917 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
918 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
919 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000920
Antoine Pitrou19690592009-06-12 20:14:08 +0000921 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000922 data = b"abcdefghi"
923 dlen = len(data)
924
925 tests = [
926 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
927 [ 100, [ 3, 3, 3], [ dlen ] ],
928 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
929 ]
930
931 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000932 rawio = self.MockFileIO(data)
933 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000934 pos = 0
935 for nbytes in buf_read_sizes:
Ezio Melotti2623a372010-11-21 13:34:58 +0000936 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Christian Heimes1a6387e2008-03-26 12:49:49 +0000937 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000938 # this is mildly implementation-dependent
Ezio Melotti2623a372010-11-21 13:34:58 +0000939 self.assertEqual(rawio.read_history, raw_read_sizes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000940
Antoine Pitrou19690592009-06-12 20:14:08 +0000941 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000942 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000943 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
944 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000945 self.assertEqual(b"abcd", bufio.read(6))
946 self.assertEqual(b"e", bufio.read(1))
947 self.assertEqual(b"fg", bufio.read())
948 self.assertEqual(b"", bufio.peek(1))
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200949 self.assertIsNone(bufio.read())
Ezio Melotti2623a372010-11-21 13:34:58 +0000950 self.assertEqual(b"", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000951
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200952 rawio = self.MockRawIO((b"a", None, None))
953 self.assertEqual(b"a", rawio.readall())
954 self.assertIsNone(rawio.readall())
955
Antoine Pitrou19690592009-06-12 20:14:08 +0000956 def test_read_past_eof(self):
957 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
958 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000959
Ezio Melotti2623a372010-11-21 13:34:58 +0000960 self.assertEqual(b"abcdefg", bufio.read(9000))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000961
Antoine Pitrou19690592009-06-12 20:14:08 +0000962 def test_read_all(self):
963 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
964 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000965
Ezio Melotti2623a372010-11-21 13:34:58 +0000966 self.assertEqual(b"abcdefg", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000967
Victor Stinner6a102812010-04-27 23:55:59 +0000968 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +0000969 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +0000970 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000971 try:
972 # Write out many bytes with exactly the same number of 0's,
973 # 1's... 255's. This will help us check that concurrent reading
974 # doesn't duplicate or forget contents.
975 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000976 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000977 random.shuffle(l)
978 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000979 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000980 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000981 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000982 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000983 errors = []
984 results = []
985 def f():
986 try:
987 # Intra-buffer read then buffer-flushing read
988 for n in cycle([1, 19]):
989 s = bufio.read(n)
990 if not s:
991 break
992 # list.append() is atomic
993 results.append(s)
994 except Exception as e:
995 errors.append(e)
996 raise
997 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchakabd8c6292015-04-01 12:56:39 +0300998 with support.start_threads(threads):
999 time.sleep(0.02) # yield
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001000 self.assertFalse(errors,
1001 "the following exceptions were caught: %r" % errors)
1002 s = b''.join(results)
1003 for i in range(256):
1004 c = bytes(bytearray([i]))
1005 self.assertEqual(s.count(c), N)
1006 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001007 support.unlink(support.TESTFN)
1008
1009 def test_misbehaved_io(self):
1010 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1011 bufio = self.tp(rawio)
1012 self.assertRaises(IOError, bufio.seek, 0)
1013 self.assertRaises(IOError, bufio.tell)
1014
Antoine Pitroucb4f47c2010-08-11 13:40:17 +00001015 def test_no_extraneous_read(self):
1016 # Issue #9550; when the raw IO object has satisfied the read request,
1017 # we should not issue any additional reads, otherwise it may block
1018 # (e.g. socket).
1019 bufsize = 16
1020 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1021 rawio = self.MockRawIO([b"x" * n])
1022 bufio = self.tp(rawio, bufsize)
1023 self.assertEqual(bufio.read(n), b"x" * n)
1024 # Simple case: one raw read is enough to satisfy the request.
1025 self.assertEqual(rawio._extraneous_reads, 0,
1026 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1027 # A more complex case where two raw reads are needed to satisfy
1028 # the request.
1029 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1030 bufio = self.tp(rawio, bufsize)
1031 self.assertEqual(bufio.read(n), b"x" * n)
1032 self.assertEqual(rawio._extraneous_reads, 0,
1033 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1034
1035
Antoine Pitroubff5df02012-07-29 19:02:46 +02001036class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001037 tp = io.BufferedReader
1038
1039 def test_constructor(self):
1040 BufferedReaderTest.test_constructor(self)
1041 # The allocation can succeed on 32-bit builds, e.g. with more
1042 # than 2GB RAM and a 64-bit kernel.
1043 if sys.maxsize > 0x7FFFFFFF:
1044 rawio = self.MockRawIO()
1045 bufio = self.tp(rawio)
1046 self.assertRaises((OverflowError, MemoryError, ValueError),
1047 bufio.__init__, rawio, sys.maxsize)
1048
1049 def test_initialization(self):
1050 rawio = self.MockRawIO([b"abc"])
1051 bufio = self.tp(rawio)
1052 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1053 self.assertRaises(ValueError, bufio.read)
1054 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1055 self.assertRaises(ValueError, bufio.read)
1056 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1057 self.assertRaises(ValueError, bufio.read)
1058
1059 def test_misbehaved_io_read(self):
1060 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1061 bufio = self.tp(rawio)
1062 # _pyio.BufferedReader seems to implement reading different, so that
1063 # checking this is not so easy.
1064 self.assertRaises(IOError, bufio.read, 10)
1065
1066 def test_garbage_collection(self):
1067 # C BufferedReader objects are collected.
1068 # The Python version has __del__, so it ends into gc.garbage instead
1069 rawio = self.FileIO(support.TESTFN, "w+b")
1070 f = self.tp(rawio)
1071 f.f = f
1072 wr = weakref.ref(f)
1073 del f
1074 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03001075 self.assertIsNone(wr(), wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00001076
R David Murray5b2cf5e2013-02-23 22:11:21 -05001077 def test_args_error(self):
1078 # Issue #17275
1079 with self.assertRaisesRegexp(TypeError, "BufferedReader"):
1080 self.tp(io.BytesIO(), 1024, 1024, 1024)
1081
1082
Antoine Pitrou19690592009-06-12 20:14:08 +00001083class PyBufferedReaderTest(BufferedReaderTest):
1084 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001085
1086
Antoine Pitrou19690592009-06-12 20:14:08 +00001087class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1088 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001089
Antoine Pitrou19690592009-06-12 20:14:08 +00001090 def test_constructor(self):
1091 rawio = self.MockRawIO()
1092 bufio = self.tp(rawio)
1093 bufio.__init__(rawio)
1094 bufio.__init__(rawio, buffer_size=1024)
1095 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +00001096 self.assertEqual(3, bufio.write(b"abc"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001097 bufio.flush()
1098 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1099 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1100 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1101 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +00001102 self.assertEqual(3, bufio.write(b"ghi"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001103 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001104 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001105
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001106 def test_uninitialized(self):
1107 bufio = self.tp.__new__(self.tp)
1108 del bufio
1109 bufio = self.tp.__new__(self.tp)
1110 self.assertRaisesRegexp((ValueError, AttributeError),
1111 'uninitialized|has no attribute',
1112 bufio.write, b'')
1113 bufio.__init__(self.MockRawIO())
1114 self.assertEqual(bufio.write(b''), 0)
1115
Antoine Pitrou19690592009-06-12 20:14:08 +00001116 def test_detach_flush(self):
1117 raw = self.MockRawIO()
1118 buf = self.tp(raw)
1119 buf.write(b"howdy!")
1120 self.assertFalse(raw._write_stack)
1121 buf.detach()
1122 self.assertEqual(raw._write_stack, [b"howdy!"])
1123
1124 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001125 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +00001126 writer = self.MockRawIO()
1127 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001128 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001129 self.assertFalse(writer._write_stack)
Martin Panterc9813d82016-06-03 05:59:20 +00001130 buffer = bytearray(b"def")
1131 bufio.write(buffer)
1132 buffer[:] = b"***" # Overwrite our copy of the data
1133 bufio.flush()
1134 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001135
Antoine Pitrou19690592009-06-12 20:14:08 +00001136 def test_write_overflow(self):
1137 writer = self.MockRawIO()
1138 bufio = self.tp(writer, 8)
1139 contents = b"abcdefghijklmnop"
1140 for n in range(0, len(contents), 3):
1141 bufio.write(contents[n:n+3])
1142 flushed = b"".join(writer._write_stack)
1143 # At least (total - 8) bytes were implicitly flushed, perhaps more
1144 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001145 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001146
Antoine Pitrou19690592009-06-12 20:14:08 +00001147 def check_writes(self, intermediate_func):
1148 # Lots of writes, test the flushed output is as expected.
1149 contents = bytes(range(256)) * 1000
1150 n = 0
1151 writer = self.MockRawIO()
1152 bufio = self.tp(writer, 13)
1153 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1154 def gen_sizes():
1155 for size in count(1):
1156 for i in range(15):
1157 yield size
1158 sizes = gen_sizes()
1159 while n < len(contents):
1160 size = min(next(sizes), len(contents) - n)
Ezio Melotti2623a372010-11-21 13:34:58 +00001161 self.assertEqual(bufio.write(contents[n:n+size]), size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001162 intermediate_func(bufio)
1163 n += size
1164 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001165 self.assertEqual(contents,
Antoine Pitrou19690592009-06-12 20:14:08 +00001166 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001167
Antoine Pitrou19690592009-06-12 20:14:08 +00001168 def test_writes(self):
1169 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001170
Antoine Pitrou19690592009-06-12 20:14:08 +00001171 def test_writes_and_flushes(self):
1172 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001173
Antoine Pitrou19690592009-06-12 20:14:08 +00001174 def test_writes_and_seeks(self):
1175 def _seekabs(bufio):
1176 pos = bufio.tell()
1177 bufio.seek(pos + 1, 0)
1178 bufio.seek(pos - 1, 0)
1179 bufio.seek(pos, 0)
1180 self.check_writes(_seekabs)
1181 def _seekrel(bufio):
1182 pos = bufio.seek(0, 1)
1183 bufio.seek(+1, 1)
1184 bufio.seek(-1, 1)
1185 bufio.seek(pos, 0)
1186 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001187
Antoine Pitrou19690592009-06-12 20:14:08 +00001188 def test_writes_and_truncates(self):
1189 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001190
Antoine Pitrou19690592009-06-12 20:14:08 +00001191 def test_write_non_blocking(self):
1192 raw = self.MockNonBlockWriterIO()
1193 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001194
Ezio Melotti2623a372010-11-21 13:34:58 +00001195 self.assertEqual(bufio.write(b"abcd"), 4)
1196 self.assertEqual(bufio.write(b"efghi"), 5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001197 # 1 byte will be written, the rest will be buffered
1198 raw.block_on(b"k")
Ezio Melotti2623a372010-11-21 13:34:58 +00001199 self.assertEqual(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001200
Antoine Pitrou19690592009-06-12 20:14:08 +00001201 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1202 raw.block_on(b"0")
1203 try:
1204 bufio.write(b"opqrwxyz0123456789")
1205 except self.BlockingIOError as e:
1206 written = e.characters_written
1207 else:
1208 self.fail("BlockingIOError should have been raised")
Ezio Melotti2623a372010-11-21 13:34:58 +00001209 self.assertEqual(written, 16)
1210 self.assertEqual(raw.pop_written(),
Antoine Pitrou19690592009-06-12 20:14:08 +00001211 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001212
Ezio Melotti2623a372010-11-21 13:34:58 +00001213 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Antoine Pitrou19690592009-06-12 20:14:08 +00001214 s = raw.pop_written()
1215 # Previously buffered bytes were flushed
1216 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001217
Antoine Pitrou19690592009-06-12 20:14:08 +00001218 def test_write_and_rewind(self):
1219 raw = io.BytesIO()
1220 bufio = self.tp(raw, 4)
1221 self.assertEqual(bufio.write(b"abcdef"), 6)
1222 self.assertEqual(bufio.tell(), 6)
1223 bufio.seek(0, 0)
1224 self.assertEqual(bufio.write(b"XY"), 2)
1225 bufio.seek(6, 0)
1226 self.assertEqual(raw.getvalue(), b"XYcdef")
1227 self.assertEqual(bufio.write(b"123456"), 6)
1228 bufio.flush()
1229 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001230
Antoine Pitrou19690592009-06-12 20:14:08 +00001231 def test_flush(self):
1232 writer = self.MockRawIO()
1233 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001234 bufio.write(b"abc")
1235 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001236 self.assertEqual(b"abc", writer._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001237
Antoine Pitrou78e761e2012-10-16 22:57:11 +02001238 def test_writelines(self):
1239 l = [b'ab', b'cd', b'ef']
1240 writer = self.MockRawIO()
1241 bufio = self.tp(writer, 8)
1242 bufio.writelines(l)
1243 bufio.flush()
1244 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1245
1246 def test_writelines_userlist(self):
1247 l = UserList([b'ab', b'cd', b'ef'])
1248 writer = self.MockRawIO()
1249 bufio = self.tp(writer, 8)
1250 bufio.writelines(l)
1251 bufio.flush()
1252 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1253
1254 def test_writelines_error(self):
1255 writer = self.MockRawIO()
1256 bufio = self.tp(writer, 8)
1257 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1258 self.assertRaises(TypeError, bufio.writelines, None)
1259
Antoine Pitrou19690592009-06-12 20:14:08 +00001260 def test_destructor(self):
1261 writer = self.MockRawIO()
1262 bufio = self.tp(writer, 8)
1263 bufio.write(b"abc")
1264 del bufio
1265 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001266 self.assertEqual(b"abc", writer._write_stack[0])
Antoine Pitrou19690592009-06-12 20:14:08 +00001267
1268 def test_truncate(self):
1269 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001270 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001271 bufio = self.tp(raw, 8)
1272 bufio.write(b"abcdef")
1273 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001274 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001275 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001276 self.assertEqual(f.read(), b"abc")
1277
Victor Stinner6a102812010-04-27 23:55:59 +00001278 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001279 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001280 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001281 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001282 # Write out many bytes from many threads and test they were
1283 # all flushed.
1284 N = 1000
1285 contents = bytes(range(256)) * N
1286 sizes = cycle([1, 19])
1287 n = 0
1288 queue = deque()
1289 while n < len(contents):
1290 size = next(sizes)
1291 queue.append(contents[n:n+size])
1292 n += size
1293 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001294 # We use a real file object because it allows us to
1295 # exercise situations where the GIL is released before
1296 # writing the buffer to the raw streams. This is in addition
1297 # to concurrency issues due to switching threads in the middle
1298 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001299 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001300 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001301 errors = []
1302 def f():
1303 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001304 while True:
1305 try:
1306 s = queue.popleft()
1307 except IndexError:
1308 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001309 bufio.write(s)
1310 except Exception as e:
1311 errors.append(e)
1312 raise
1313 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03001314 with support.start_threads(threads):
1315 time.sleep(0.02) # yield
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001316 self.assertFalse(errors,
1317 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001318 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001319 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001320 s = f.read()
1321 for i in range(256):
Ezio Melotti2623a372010-11-21 13:34:58 +00001322 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001323 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001324 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001325
Antoine Pitrou19690592009-06-12 20:14:08 +00001326 def test_misbehaved_io(self):
1327 rawio = self.MisbehavedRawIO()
1328 bufio = self.tp(rawio, 5)
1329 self.assertRaises(IOError, bufio.seek, 0)
1330 self.assertRaises(IOError, bufio.tell)
1331 self.assertRaises(IOError, bufio.write, b"abcdef")
1332
1333 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001334 with support.check_warnings(("max_buffer_size is deprecated",
1335 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001336 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001337
Benjamin Petersona2d6d712012-12-20 12:24:10 -06001338 def test_write_error_on_close(self):
1339 raw = self.MockRawIO()
1340 def bad_write(b):
1341 raise IOError()
1342 raw.write = bad_write
1343 b = self.tp(raw)
1344 b.write(b'spam')
1345 self.assertRaises(IOError, b.close) # exception not swallowed
1346 self.assertTrue(b.closed)
1347
Antoine Pitrou19690592009-06-12 20:14:08 +00001348
Antoine Pitroubff5df02012-07-29 19:02:46 +02001349class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001350 tp = io.BufferedWriter
1351
1352 def test_constructor(self):
1353 BufferedWriterTest.test_constructor(self)
1354 # The allocation can succeed on 32-bit builds, e.g. with more
1355 # than 2GB RAM and a 64-bit kernel.
1356 if sys.maxsize > 0x7FFFFFFF:
1357 rawio = self.MockRawIO()
1358 bufio = self.tp(rawio)
1359 self.assertRaises((OverflowError, MemoryError, ValueError),
1360 bufio.__init__, rawio, sys.maxsize)
1361
1362 def test_initialization(self):
1363 rawio = self.MockRawIO()
1364 bufio = self.tp(rawio)
1365 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1366 self.assertRaises(ValueError, bufio.write, b"def")
1367 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1368 self.assertRaises(ValueError, bufio.write, b"def")
1369 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1370 self.assertRaises(ValueError, bufio.write, b"def")
1371
1372 def test_garbage_collection(self):
1373 # C BufferedWriter objects are collected, and collecting them flushes
1374 # all data to disk.
1375 # The Python version has __del__, so it ends into gc.garbage instead
1376 rawio = self.FileIO(support.TESTFN, "w+b")
1377 f = self.tp(rawio)
1378 f.write(b"123xxx")
1379 f.x = f
1380 wr = weakref.ref(f)
1381 del f
1382 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03001383 self.assertIsNone(wr(), wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001384 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001385 self.assertEqual(f.read(), b"123xxx")
1386
R David Murray5b2cf5e2013-02-23 22:11:21 -05001387 def test_args_error(self):
1388 # Issue #17275
1389 with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
1390 self.tp(io.BytesIO(), 1024, 1024, 1024)
1391
Antoine Pitrou19690592009-06-12 20:14:08 +00001392
1393class PyBufferedWriterTest(BufferedWriterTest):
1394 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001395
1396class BufferedRWPairTest(unittest.TestCase):
1397
Antoine Pitrou19690592009-06-12 20:14:08 +00001398 def test_constructor(self):
1399 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001400 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001401
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001402 def test_uninitialized(self):
1403 pair = self.tp.__new__(self.tp)
1404 del pair
1405 pair = self.tp.__new__(self.tp)
1406 self.assertRaisesRegexp((ValueError, AttributeError),
1407 'uninitialized|has no attribute',
1408 pair.read, 0)
1409 self.assertRaisesRegexp((ValueError, AttributeError),
1410 'uninitialized|has no attribute',
1411 pair.write, b'')
1412 pair.__init__(self.MockRawIO(), self.MockRawIO())
1413 self.assertEqual(pair.read(0), b'')
1414 self.assertEqual(pair.write(b''), 0)
1415
Antoine Pitrou19690592009-06-12 20:14:08 +00001416 def test_detach(self):
1417 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1418 self.assertRaises(self.UnsupportedOperation, pair.detach)
1419
1420 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001421 with support.check_warnings(("max_buffer_size is deprecated",
1422 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001423 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001424
1425 def test_constructor_with_not_readable(self):
1426 class NotReadable(MockRawIO):
1427 def readable(self):
1428 return False
1429
1430 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1431
1432 def test_constructor_with_not_writeable(self):
1433 class NotWriteable(MockRawIO):
1434 def writable(self):
1435 return False
1436
1437 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1438
1439 def test_read(self):
1440 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1441
1442 self.assertEqual(pair.read(3), b"abc")
1443 self.assertEqual(pair.read(1), b"d")
1444 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001445 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1446 self.assertEqual(pair.read(None), b"abc")
1447
1448 def test_readlines(self):
1449 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1450 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1451 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1452 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001453
1454 def test_read1(self):
1455 # .read1() is delegated to the underlying reader object, so this test
1456 # can be shallow.
1457 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1458
1459 self.assertEqual(pair.read1(3), b"abc")
1460
1461 def test_readinto(self):
1462 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1463
Martin Panterc9813d82016-06-03 05:59:20 +00001464 data = byteslike(5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001465 self.assertEqual(pair.readinto(data), 5)
Martin Panterc9813d82016-06-03 05:59:20 +00001466 self.assertEqual(data.tobytes(), b"abcde")
Antoine Pitrou19690592009-06-12 20:14:08 +00001467
1468 def test_write(self):
1469 w = self.MockRawIO()
1470 pair = self.tp(self.MockRawIO(), w)
1471
1472 pair.write(b"abc")
1473 pair.flush()
Martin Panterc9813d82016-06-03 05:59:20 +00001474 buffer = bytearray(b"def")
1475 pair.write(buffer)
1476 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitrou19690592009-06-12 20:14:08 +00001477 pair.flush()
1478 self.assertEqual(w._write_stack, [b"abc", b"def"])
1479
1480 def test_peek(self):
1481 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1482
1483 self.assertTrue(pair.peek(3).startswith(b"abc"))
1484 self.assertEqual(pair.read(3), b"abc")
1485
1486 def test_readable(self):
1487 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1488 self.assertTrue(pair.readable())
1489
1490 def test_writeable(self):
1491 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1492 self.assertTrue(pair.writable())
1493
1494 def test_seekable(self):
1495 # BufferedRWPairs are never seekable, even if their readers and writers
1496 # are.
1497 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1498 self.assertFalse(pair.seekable())
1499
1500 # .flush() is delegated to the underlying writer object and has been
1501 # tested in the test_write method.
1502
1503 def test_close_and_closed(self):
1504 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1505 self.assertFalse(pair.closed)
1506 pair.close()
1507 self.assertTrue(pair.closed)
1508
Serhiy Storchakaf95a57f2015-03-24 23:23:42 +02001509 def test_reader_close_error_on_close(self):
1510 def reader_close():
1511 reader_non_existing
1512 reader = self.MockRawIO()
1513 reader.close = reader_close
1514 writer = self.MockRawIO()
1515 pair = self.tp(reader, writer)
1516 with self.assertRaises(NameError) as err:
1517 pair.close()
1518 self.assertIn('reader_non_existing', str(err.exception))
1519 self.assertTrue(pair.closed)
1520 self.assertFalse(reader.closed)
1521 self.assertTrue(writer.closed)
1522
1523 def test_writer_close_error_on_close(self):
1524 def writer_close():
1525 writer_non_existing
1526 reader = self.MockRawIO()
1527 writer = self.MockRawIO()
1528 writer.close = writer_close
1529 pair = self.tp(reader, writer)
1530 with self.assertRaises(NameError) as err:
1531 pair.close()
1532 self.assertIn('writer_non_existing', str(err.exception))
1533 self.assertFalse(pair.closed)
1534 self.assertTrue(reader.closed)
1535 self.assertFalse(writer.closed)
1536
1537 def test_reader_writer_close_error_on_close(self):
1538 def reader_close():
1539 reader_non_existing
1540 def writer_close():
1541 writer_non_existing
1542 reader = self.MockRawIO()
1543 reader.close = reader_close
1544 writer = self.MockRawIO()
1545 writer.close = writer_close
1546 pair = self.tp(reader, writer)
1547 with self.assertRaises(NameError) as err:
1548 pair.close()
1549 self.assertIn('reader_non_existing', str(err.exception))
1550 self.assertFalse(pair.closed)
1551 self.assertFalse(reader.closed)
1552 self.assertFalse(writer.closed)
1553
Antoine Pitrou19690592009-06-12 20:14:08 +00001554 def test_isatty(self):
1555 class SelectableIsAtty(MockRawIO):
1556 def __init__(self, isatty):
1557 MockRawIO.__init__(self)
1558 self._isatty = isatty
1559
1560 def isatty(self):
1561 return self._isatty
1562
1563 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1564 self.assertFalse(pair.isatty())
1565
1566 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1567 self.assertTrue(pair.isatty())
1568
1569 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1570 self.assertTrue(pair.isatty())
1571
1572 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1573 self.assertTrue(pair.isatty())
1574
Benjamin Peterson1c873bf2014-09-29 22:46:57 -04001575 def test_weakref_clearing(self):
1576 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1577 ref = weakref.ref(brw)
1578 brw = None
1579 ref = None # Shouldn't segfault.
1580
Antoine Pitrou19690592009-06-12 20:14:08 +00001581class CBufferedRWPairTest(BufferedRWPairTest):
1582 tp = io.BufferedRWPair
1583
1584class PyBufferedRWPairTest(BufferedRWPairTest):
1585 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001586
1587
Antoine Pitrou19690592009-06-12 20:14:08 +00001588class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1589 read_mode = "rb+"
1590 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001591
Antoine Pitrou19690592009-06-12 20:14:08 +00001592 def test_constructor(self):
1593 BufferedReaderTest.test_constructor(self)
1594 BufferedWriterTest.test_constructor(self)
1595
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001596 def test_uninitialized(self):
1597 BufferedReaderTest.test_uninitialized(self)
1598 BufferedWriterTest.test_uninitialized(self)
1599
Antoine Pitrou19690592009-06-12 20:14:08 +00001600 def test_read_and_write(self):
1601 raw = self.MockRawIO((b"asdf", b"ghjk"))
1602 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001603
1604 self.assertEqual(b"as", rw.read(2))
1605 rw.write(b"ddd")
1606 rw.write(b"eee")
1607 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001608 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001609 self.assertEqual(b"dddeee", raw._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001610
Antoine Pitrou19690592009-06-12 20:14:08 +00001611 def test_seek_and_tell(self):
1612 raw = self.BytesIO(b"asdfghjkl")
1613 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001614
Ezio Melotti2623a372010-11-21 13:34:58 +00001615 self.assertEqual(b"as", rw.read(2))
1616 self.assertEqual(2, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001617 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001618 self.assertEqual(b"asdf", rw.read(4))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001619
Antoine Pitrou808cec52011-08-20 15:40:58 +02001620 rw.write(b"123f")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001621 rw.seek(0, 0)
Antoine Pitrou808cec52011-08-20 15:40:58 +02001622 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001623 self.assertEqual(9, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001624 rw.seek(-4, 2)
Ezio Melotti2623a372010-11-21 13:34:58 +00001625 self.assertEqual(5, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001626 rw.seek(2, 1)
Ezio Melotti2623a372010-11-21 13:34:58 +00001627 self.assertEqual(7, rw.tell())
1628 self.assertEqual(b"fl", rw.read(11))
Antoine Pitrou808cec52011-08-20 15:40:58 +02001629 rw.flush()
1630 self.assertEqual(b"asdf123fl", raw.getvalue())
1631
Christian Heimes1a6387e2008-03-26 12:49:49 +00001632 self.assertRaises(TypeError, rw.seek, 0.0)
1633
Antoine Pitrou19690592009-06-12 20:14:08 +00001634 def check_flush_and_read(self, read_func):
1635 raw = self.BytesIO(b"abcdefghi")
1636 bufio = self.tp(raw)
1637
Ezio Melotti2623a372010-11-21 13:34:58 +00001638 self.assertEqual(b"ab", read_func(bufio, 2))
Antoine Pitrou19690592009-06-12 20:14:08 +00001639 bufio.write(b"12")
Ezio Melotti2623a372010-11-21 13:34:58 +00001640 self.assertEqual(b"ef", read_func(bufio, 2))
1641 self.assertEqual(6, bufio.tell())
Antoine Pitrou19690592009-06-12 20:14:08 +00001642 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001643 self.assertEqual(6, bufio.tell())
1644 self.assertEqual(b"ghi", read_func(bufio))
Antoine Pitrou19690592009-06-12 20:14:08 +00001645 raw.seek(0, 0)
1646 raw.write(b"XYZ")
1647 # flush() resets the read buffer
1648 bufio.flush()
1649 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001650 self.assertEqual(b"XYZ", read_func(bufio, 3))
Antoine Pitrou19690592009-06-12 20:14:08 +00001651
1652 def test_flush_and_read(self):
1653 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1654
1655 def test_flush_and_readinto(self):
1656 def _readinto(bufio, n=-1):
1657 b = bytearray(n if n >= 0 else 9999)
1658 n = bufio.readinto(b)
1659 return bytes(b[:n])
1660 self.check_flush_and_read(_readinto)
1661
1662 def test_flush_and_peek(self):
1663 def _peek(bufio, n=-1):
1664 # This relies on the fact that the buffer can contain the whole
1665 # raw stream, otherwise peek() can return less.
1666 b = bufio.peek(n)
1667 if n != -1:
1668 b = b[:n]
1669 bufio.seek(len(b), 1)
1670 return b
1671 self.check_flush_and_read(_peek)
1672
1673 def test_flush_and_write(self):
1674 raw = self.BytesIO(b"abcdefghi")
1675 bufio = self.tp(raw)
1676
1677 bufio.write(b"123")
1678 bufio.flush()
1679 bufio.write(b"45")
1680 bufio.flush()
1681 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001682 self.assertEqual(b"12345fghi", raw.getvalue())
1683 self.assertEqual(b"12345fghi", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +00001684
1685 def test_threads(self):
1686 BufferedReaderTest.test_threads(self)
1687 BufferedWriterTest.test_threads(self)
1688
1689 def test_writes_and_peek(self):
1690 def _peek(bufio):
1691 bufio.peek(1)
1692 self.check_writes(_peek)
1693 def _peek(bufio):
1694 pos = bufio.tell()
1695 bufio.seek(-1, 1)
1696 bufio.peek(1)
1697 bufio.seek(pos, 0)
1698 self.check_writes(_peek)
1699
1700 def test_writes_and_reads(self):
1701 def _read(bufio):
1702 bufio.seek(-1, 1)
1703 bufio.read(1)
1704 self.check_writes(_read)
1705
1706 def test_writes_and_read1s(self):
1707 def _read1(bufio):
1708 bufio.seek(-1, 1)
1709 bufio.read1(1)
1710 self.check_writes(_read1)
1711
1712 def test_writes_and_readintos(self):
1713 def _read(bufio):
1714 bufio.seek(-1, 1)
1715 bufio.readinto(bytearray(1))
1716 self.check_writes(_read)
1717
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001718 def test_write_after_readahead(self):
1719 # Issue #6629: writing after the buffer was filled by readahead should
1720 # first rewind the raw stream.
1721 for overwrite_size in [1, 5]:
1722 raw = self.BytesIO(b"A" * 10)
1723 bufio = self.tp(raw, 4)
1724 # Trigger readahead
1725 self.assertEqual(bufio.read(1), b"A")
1726 self.assertEqual(bufio.tell(), 1)
1727 # Overwriting should rewind the raw stream if it needs so
1728 bufio.write(b"B" * overwrite_size)
1729 self.assertEqual(bufio.tell(), overwrite_size + 1)
1730 # If the write size was smaller than the buffer size, flush() and
1731 # check that rewind happens.
1732 bufio.flush()
1733 self.assertEqual(bufio.tell(), overwrite_size + 1)
1734 s = raw.getvalue()
1735 self.assertEqual(s,
1736 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1737
Antoine Pitrouee46a7b2011-05-13 00:31:52 +02001738 def test_write_rewind_write(self):
1739 # Various combinations of reading / writing / seeking backwards / writing again
1740 def mutate(bufio, pos1, pos2):
1741 assert pos2 >= pos1
1742 # Fill the buffer
1743 bufio.seek(pos1)
1744 bufio.read(pos2 - pos1)
1745 bufio.write(b'\x02')
1746 # This writes earlier than the previous write, but still inside
1747 # the buffer.
1748 bufio.seek(pos1)
1749 bufio.write(b'\x01')
1750
1751 b = b"\x80\x81\x82\x83\x84"
1752 for i in range(0, len(b)):
1753 for j in range(i, len(b)):
1754 raw = self.BytesIO(b)
1755 bufio = self.tp(raw, 100)
1756 mutate(bufio, i, j)
1757 bufio.flush()
1758 expected = bytearray(b)
1759 expected[j] = 2
1760 expected[i] = 1
1761 self.assertEqual(raw.getvalue(), expected,
1762 "failed result for i=%d, j=%d" % (i, j))
1763
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001764 def test_truncate_after_read_or_write(self):
1765 raw = self.BytesIO(b"A" * 10)
1766 bufio = self.tp(raw, 100)
1767 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1768 self.assertEqual(bufio.truncate(), 2)
1769 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1770 self.assertEqual(bufio.truncate(), 4)
1771
Antoine Pitrou19690592009-06-12 20:14:08 +00001772 def test_misbehaved_io(self):
1773 BufferedReaderTest.test_misbehaved_io(self)
1774 BufferedWriterTest.test_misbehaved_io(self)
1775
Antoine Pitrou808cec52011-08-20 15:40:58 +02001776 def test_interleaved_read_write(self):
1777 # Test for issue #12213
1778 with self.BytesIO(b'abcdefgh') as raw:
1779 with self.tp(raw, 100) as f:
1780 f.write(b"1")
1781 self.assertEqual(f.read(1), b'b')
1782 f.write(b'2')
1783 self.assertEqual(f.read1(1), b'd')
1784 f.write(b'3')
1785 buf = bytearray(1)
1786 f.readinto(buf)
1787 self.assertEqual(buf, b'f')
1788 f.write(b'4')
1789 self.assertEqual(f.peek(1), b'h')
1790 f.flush()
1791 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1792
1793 with self.BytesIO(b'abc') as raw:
1794 with self.tp(raw, 100) as f:
1795 self.assertEqual(f.read(1), b'a')
1796 f.write(b"2")
1797 self.assertEqual(f.read(1), b'c')
1798 f.flush()
1799 self.assertEqual(raw.getvalue(), b'a2c')
1800
1801 def test_interleaved_readline_write(self):
1802 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1803 with self.tp(raw) as f:
1804 f.write(b'1')
1805 self.assertEqual(f.readline(), b'b\n')
1806 f.write(b'2')
1807 self.assertEqual(f.readline(), b'def\n')
1808 f.write(b'3')
1809 self.assertEqual(f.readline(), b'\n')
1810 f.flush()
1811 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1812
R David Murray5b2cf5e2013-02-23 22:11:21 -05001813
Antoine Pitroubff5df02012-07-29 19:02:46 +02001814class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
1815 BufferedRandomTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001816 tp = io.BufferedRandom
1817
1818 def test_constructor(self):
1819 BufferedRandomTest.test_constructor(self)
1820 # The allocation can succeed on 32-bit builds, e.g. with more
1821 # than 2GB RAM and a 64-bit kernel.
1822 if sys.maxsize > 0x7FFFFFFF:
1823 rawio = self.MockRawIO()
1824 bufio = self.tp(rawio)
1825 self.assertRaises((OverflowError, MemoryError, ValueError),
1826 bufio.__init__, rawio, sys.maxsize)
1827
1828 def test_garbage_collection(self):
1829 CBufferedReaderTest.test_garbage_collection(self)
1830 CBufferedWriterTest.test_garbage_collection(self)
1831
R David Murray5b2cf5e2013-02-23 22:11:21 -05001832 def test_args_error(self):
1833 # Issue #17275
1834 with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
1835 self.tp(io.BytesIO(), 1024, 1024, 1024)
1836
1837
Antoine Pitrou19690592009-06-12 20:14:08 +00001838class PyBufferedRandomTest(BufferedRandomTest):
1839 tp = pyio.BufferedRandom
1840
1841
Christian Heimes1a6387e2008-03-26 12:49:49 +00001842# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1843# properties:
1844# - A single output character can correspond to many bytes of input.
1845# - The number of input bytes to complete the character can be
1846# undetermined until the last input byte is received.
1847# - The number of input bytes can vary depending on previous input.
1848# - A single input byte can correspond to many characters of output.
1849# - The number of output characters can be undetermined until the
1850# last input byte is received.
1851# - The number of output characters can vary depending on previous input.
1852
1853class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1854 """
1855 For testing seek/tell behavior with a stateful, buffering decoder.
1856
1857 Input is a sequence of words. Words may be fixed-length (length set
1858 by input) or variable-length (period-terminated). In variable-length
1859 mode, extra periods are ignored. Possible words are:
1860 - 'i' followed by a number sets the input length, I (maximum 99).
1861 When I is set to 0, words are space-terminated.
1862 - 'o' followed by a number sets the output length, O (maximum 99).
1863 - Any other word is converted into a word followed by a period on
1864 the output. The output word consists of the input word truncated
1865 or padded out with hyphens to make its length equal to O. If O
1866 is 0, the word is output verbatim without truncating or padding.
1867 I and O are initially set to 1. When I changes, any buffered input is
1868 re-scanned according to the new I. EOF also terminates the last word.
1869 """
1870
1871 def __init__(self, errors='strict'):
1872 codecs.IncrementalDecoder.__init__(self, errors)
1873 self.reset()
1874
1875 def __repr__(self):
1876 return '<SID %x>' % id(self)
1877
1878 def reset(self):
1879 self.i = 1
1880 self.o = 1
1881 self.buffer = bytearray()
1882
1883 def getstate(self):
1884 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1885 return bytes(self.buffer), i*100 + o
1886
1887 def setstate(self, state):
1888 buffer, io = state
1889 self.buffer = bytearray(buffer)
1890 i, o = divmod(io, 100)
1891 self.i, self.o = i ^ 1, o ^ 1
1892
1893 def decode(self, input, final=False):
1894 output = ''
1895 for b in input:
1896 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001897 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001898 if self.buffer:
1899 output += self.process_word()
1900 else:
1901 self.buffer.append(b)
1902 else: # fixed-length, terminate after self.i bytes
1903 self.buffer.append(b)
1904 if len(self.buffer) == self.i:
1905 output += self.process_word()
1906 if final and self.buffer: # EOF terminates the last word
1907 output += self.process_word()
1908 return output
1909
1910 def process_word(self):
1911 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001912 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001913 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001914 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001915 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1916 else:
1917 output = self.buffer.decode('ascii')
1918 if len(output) < self.o:
1919 output += '-'*self.o # pad out with hyphens
1920 if self.o:
1921 output = output[:self.o] # truncate to output length
1922 output += '.'
1923 self.buffer = bytearray()
1924 return output
1925
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001926 codecEnabled = False
1927
1928 @classmethod
1929 def lookupTestDecoder(cls, name):
1930 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001931 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001932 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001933 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001934 incrementalencoder=None,
1935 streamreader=None, streamwriter=None,
1936 incrementaldecoder=cls)
1937
1938# Register the previous decoder for testing.
1939# Disabled by default, tests will enable it.
1940codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1941
1942
Christian Heimes1a6387e2008-03-26 12:49:49 +00001943class StatefulIncrementalDecoderTest(unittest.TestCase):
1944 """
1945 Make sure the StatefulIncrementalDecoder actually works.
1946 """
1947
1948 test_cases = [
1949 # I=1, O=1 (fixed-length input == fixed-length output)
1950 (b'abcd', False, 'a.b.c.d.'),
1951 # I=0, O=0 (variable-length input, variable-length output)
1952 (b'oiabcd', True, 'abcd.'),
1953 # I=0, O=0 (should ignore extra periods)
1954 (b'oi...abcd...', True, 'abcd.'),
1955 # I=0, O=6 (variable-length input, fixed-length output)
1956 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1957 # I=2, O=6 (fixed-length input < fixed-length output)
1958 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1959 # I=6, O=3 (fixed-length input > fixed-length output)
1960 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1961 # I=0, then 3; O=29, then 15 (with longer output)
1962 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1963 'a----------------------------.' +
1964 'b----------------------------.' +
1965 'cde--------------------------.' +
1966 'abcdefghijabcde.' +
1967 'a.b------------.' +
1968 '.c.------------.' +
1969 'd.e------------.' +
1970 'k--------------.' +
1971 'l--------------.' +
1972 'm--------------.')
1973 ]
1974
Antoine Pitrou19690592009-06-12 20:14:08 +00001975 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001976 # Try a few one-shot test cases.
1977 for input, eof, output in self.test_cases:
1978 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001979 self.assertEqual(d.decode(input, eof), output)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001980
1981 # Also test an unfinished decode, followed by forcing EOF.
1982 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001983 self.assertEqual(d.decode(b'oiabcd'), '')
1984 self.assertEqual(d.decode(b'', 1), 'abcd.')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001985
1986class TextIOWrapperTest(unittest.TestCase):
1987
1988 def setUp(self):
1989 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1990 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001991 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001992
1993 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001994 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001995
Antoine Pitrou19690592009-06-12 20:14:08 +00001996 def test_constructor(self):
1997 r = self.BytesIO(b"\xc3\xa9\n\n")
1998 b = self.BufferedReader(r, 1000)
1999 t = self.TextIOWrapper(b)
2000 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti2623a372010-11-21 13:34:58 +00002001 self.assertEqual(t.encoding, "latin1")
2002 self.assertEqual(t.line_buffering, False)
Antoine Pitrou19690592009-06-12 20:14:08 +00002003 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti2623a372010-11-21 13:34:58 +00002004 self.assertEqual(t.encoding, "utf8")
2005 self.assertEqual(t.line_buffering, True)
2006 self.assertEqual("\xe9\n", t.readline())
Antoine Pitrou19690592009-06-12 20:14:08 +00002007 self.assertRaises(TypeError, t.__init__, b, newline=42)
2008 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2009
Serhiy Storchakaabb7e652015-04-16 11:56:35 +03002010 def test_uninitialized(self):
2011 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2012 del t
2013 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2014 self.assertRaises(Exception, repr, t)
2015 self.assertRaisesRegexp((ValueError, AttributeError),
2016 'uninitialized|has no attribute',
2017 t.read, 0)
2018 t.__init__(self.MockRawIO())
2019 self.assertEqual(t.read(0), u'')
2020
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002021 def test_non_text_encoding_codecs_are_rejected(self):
2022 # Ensure the constructor complains if passed a codec that isn't
2023 # marked as a text encoding
2024 # http://bugs.python.org/issue20404
2025 r = self.BytesIO()
2026 b = self.BufferedWriter(r)
2027 with support.check_py3k_warnings():
2028 self.TextIOWrapper(b, encoding="hex_codec")
2029
Antoine Pitrou19690592009-06-12 20:14:08 +00002030 def test_detach(self):
2031 r = self.BytesIO()
2032 b = self.BufferedWriter(r)
2033 t = self.TextIOWrapper(b)
2034 self.assertIs(t.detach(), b)
2035
2036 t = self.TextIOWrapper(b, encoding="ascii")
2037 t.write("howdy")
2038 self.assertFalse(r.getvalue())
2039 t.detach()
2040 self.assertEqual(r.getvalue(), b"howdy")
2041 self.assertRaises(ValueError, t.detach)
2042
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002043 # Operations independent of the detached stream should still work
2044 repr(t)
2045 self.assertEqual(t.encoding, "ascii")
2046 self.assertEqual(t.errors, "strict")
2047 self.assertFalse(t.line_buffering)
2048
Antoine Pitrou19690592009-06-12 20:14:08 +00002049 def test_repr(self):
2050 raw = self.BytesIO("hello".encode("utf-8"))
2051 b = self.BufferedReader(raw)
2052 t = self.TextIOWrapper(b, encoding="utf-8")
2053 modname = self.TextIOWrapper.__module__
2054 self.assertEqual(repr(t),
2055 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2056 raw.name = "dummy"
2057 self.assertEqual(repr(t),
2058 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
2059 raw.name = b"dummy"
2060 self.assertEqual(repr(t),
2061 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
2062
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002063 t.buffer.detach()
2064 repr(t) # Should not raise an exception
2065
Antoine Pitrou19690592009-06-12 20:14:08 +00002066 def test_line_buffering(self):
2067 r = self.BytesIO()
2068 b = self.BufferedWriter(r, 1000)
2069 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
2070 t.write("X")
Ezio Melotti2623a372010-11-21 13:34:58 +00002071 self.assertEqual(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00002072 t.write("Y\nZ")
Ezio Melotti2623a372010-11-21 13:34:58 +00002073 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00002074 t.write("A\rB")
Ezio Melotti2623a372010-11-21 13:34:58 +00002075 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002076
Antoine Pitrou19690592009-06-12 20:14:08 +00002077 def test_encoding(self):
2078 # Check the encoding attribute is always set, and valid
2079 b = self.BytesIO()
2080 t = self.TextIOWrapper(b, encoding="utf8")
2081 self.assertEqual(t.encoding, "utf8")
2082 t = self.TextIOWrapper(b)
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03002083 self.assertIsNotNone(t.encoding)
Antoine Pitrou19690592009-06-12 20:14:08 +00002084 codecs.lookup(t.encoding)
2085
2086 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002087 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00002088 b = self.BytesIO(b"abc\n\xff\n")
2089 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002090 self.assertRaises(UnicodeError, t.read)
2091 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00002092 b = self.BytesIO(b"abc\n\xff\n")
2093 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002094 self.assertRaises(UnicodeError, t.read)
2095 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00002096 b = self.BytesIO(b"abc\n\xff\n")
2097 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti2623a372010-11-21 13:34:58 +00002098 self.assertEqual(t.read(), "abc\n\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002099 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00002100 b = self.BytesIO(b"abc\n\xff\n")
2101 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti2623a372010-11-21 13:34:58 +00002102 self.assertEqual(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002103
Antoine Pitrou19690592009-06-12 20:14:08 +00002104 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002105 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00002106 b = self.BytesIO()
2107 t = self.TextIOWrapper(b, encoding="ascii")
2108 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002109 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00002110 b = self.BytesIO()
2111 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2112 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002113 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00002114 b = self.BytesIO()
2115 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00002116 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00002117 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002118 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002119 self.assertEqual(b.getvalue(), b"abcdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002120 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00002121 b = self.BytesIO()
2122 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00002123 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00002124 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002125 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002126 self.assertEqual(b.getvalue(), b"abc?def\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002127
Antoine Pitrou19690592009-06-12 20:14:08 +00002128 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002129 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2130
2131 tests = [
2132 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
2133 [ '', input_lines ],
2134 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2135 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2136 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
2137 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002138 encodings = (
2139 'utf-8', 'latin-1',
2140 'utf-16', 'utf-16-le', 'utf-16-be',
2141 'utf-32', 'utf-32-le', 'utf-32-be',
2142 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00002143
2144 # Try a range of buffer sizes to test the case where \r is the last
2145 # character in TextIOWrapper._pending_line.
2146 for encoding in encodings:
2147 # XXX: str.encode() should return bytes
2148 data = bytes(''.join(input_lines).encode(encoding))
2149 for do_reads in (False, True):
2150 for bufsize in range(1, 10):
2151 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002152 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2153 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00002154 encoding=encoding)
2155 if do_reads:
2156 got_lines = []
2157 while True:
2158 c2 = textio.read(2)
2159 if c2 == '':
2160 break
Ezio Melotti2623a372010-11-21 13:34:58 +00002161 self.assertEqual(len(c2), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002162 got_lines.append(c2 + textio.readline())
2163 else:
2164 got_lines = list(textio)
2165
2166 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti2623a372010-11-21 13:34:58 +00002167 self.assertEqual(got_line, exp_line)
2168 self.assertEqual(len(got_lines), len(exp_lines))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002169
Antoine Pitrou19690592009-06-12 20:14:08 +00002170 def test_newlines_input(self):
2171 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002172 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2173 for newline, expected in [
2174 (None, normalized.decode("ascii").splitlines(True)),
2175 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00002176 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2177 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2178 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00002179 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00002180 buf = self.BytesIO(testdata)
2181 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti2623a372010-11-21 13:34:58 +00002182 self.assertEqual(txt.readlines(), expected)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002183 txt.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002184 self.assertEqual(txt.read(), "".join(expected))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002185
Antoine Pitrou19690592009-06-12 20:14:08 +00002186 def test_newlines_output(self):
2187 testdict = {
2188 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2189 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2190 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2191 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2192 }
2193 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2194 for newline, expected in tests:
2195 buf = self.BytesIO()
2196 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2197 txt.write("AAA\nB")
2198 txt.write("BB\nCCC\n")
2199 txt.write("X\rY\r\nZ")
2200 txt.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002201 self.assertEqual(buf.closed, False)
2202 self.assertEqual(buf.getvalue(), expected)
Antoine Pitrou19690592009-06-12 20:14:08 +00002203
2204 def test_destructor(self):
2205 l = []
2206 base = self.BytesIO
2207 class MyBytesIO(base):
2208 def close(self):
2209 l.append(self.getvalue())
2210 base.close(self)
2211 b = MyBytesIO()
2212 t = self.TextIOWrapper(b, encoding="ascii")
2213 t.write("abc")
2214 del t
2215 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00002216 self.assertEqual([b"abc"], l)
Antoine Pitrou19690592009-06-12 20:14:08 +00002217
2218 def test_override_destructor(self):
2219 record = []
2220 class MyTextIO(self.TextIOWrapper):
2221 def __del__(self):
2222 record.append(1)
2223 try:
2224 f = super(MyTextIO, self).__del__
2225 except AttributeError:
2226 pass
2227 else:
2228 f()
2229 def close(self):
2230 record.append(2)
2231 super(MyTextIO, self).close()
2232 def flush(self):
2233 record.append(3)
2234 super(MyTextIO, self).flush()
2235 b = self.BytesIO()
2236 t = MyTextIO(b, encoding="ascii")
2237 del t
2238 support.gc_collect()
2239 self.assertEqual(record, [1, 2, 3])
2240
2241 def test_error_through_destructor(self):
2242 # Test that the exception state is not modified by a destructor,
2243 # even if close() fails.
2244 rawio = self.CloseFailureIO()
2245 def f():
2246 self.TextIOWrapper(rawio).xyzzy
2247 with support.captured_output("stderr") as s:
2248 self.assertRaises(AttributeError, f)
2249 s = s.getvalue().strip()
2250 if s:
2251 # The destructor *may* have printed an unraisable error, check it
2252 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002253 self.assertTrue(s.startswith("Exception IOError: "), s)
2254 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002255
2256 # Systematic tests of the text I/O API
2257
Antoine Pitrou19690592009-06-12 20:14:08 +00002258 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002259 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2260 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00002261 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002262 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002263 self.assertEqual(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002264 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002265 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002266 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002267 self.assertEqual(f.tell(), 0)
2268 self.assertEqual(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002269 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002270 self.assertEqual(f.seek(0), 0)
2271 self.assertEqual(f.read(None), "abc")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002272 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002273 self.assertEqual(f.read(2), "ab")
2274 self.assertEqual(f.read(1), "c")
2275 self.assertEqual(f.read(1), "")
2276 self.assertEqual(f.read(), "")
2277 self.assertEqual(f.tell(), cookie)
2278 self.assertEqual(f.seek(0), 0)
2279 self.assertEqual(f.seek(0, 2), cookie)
2280 self.assertEqual(f.write("def"), 3)
2281 self.assertEqual(f.seek(cookie), cookie)
2282 self.assertEqual(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002283 if enc.startswith("utf"):
2284 self.multi_line_test(f, enc)
2285 f.close()
2286
2287 def multi_line_test(self, f, enc):
2288 f.seek(0)
2289 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00002290 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002291 wlines = []
2292 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2293 chars = []
2294 for i in range(size):
2295 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00002296 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002297 wlines.append((f.tell(), line))
2298 f.write(line)
2299 f.seek(0)
2300 rlines = []
2301 while True:
2302 pos = f.tell()
2303 line = f.readline()
2304 if not line:
2305 break
2306 rlines.append((pos, line))
Ezio Melotti2623a372010-11-21 13:34:58 +00002307 self.assertEqual(rlines, wlines)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002308
Antoine Pitrou19690592009-06-12 20:14:08 +00002309 def test_telling(self):
2310 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002311 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002312 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002313 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002314 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002315 p2 = f.tell()
2316 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002317 self.assertEqual(f.tell(), p0)
2318 self.assertEqual(f.readline(), "\xff\n")
2319 self.assertEqual(f.tell(), p1)
2320 self.assertEqual(f.readline(), "\xff\n")
2321 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002322 f.seek(0)
2323 for line in f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002324 self.assertEqual(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002325 self.assertRaises(IOError, f.tell)
Ezio Melotti2623a372010-11-21 13:34:58 +00002326 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002327 f.close()
2328
Antoine Pitrou19690592009-06-12 20:14:08 +00002329 def test_seeking(self):
2330 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00002331 prefix_size = chunk_size - 2
2332 u_prefix = "a" * prefix_size
2333 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti2623a372010-11-21 13:34:58 +00002334 self.assertEqual(len(u_prefix), len(prefix))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002335 u_suffix = "\u8888\n"
2336 suffix = bytes(u_suffix.encode("utf-8"))
2337 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00002338 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002339 f.write(line*2)
2340 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002341 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002342 s = f.read(prefix_size)
Ezio Melotti2623a372010-11-21 13:34:58 +00002343 self.assertEqual(s, prefix.decode("ascii"))
2344 self.assertEqual(f.tell(), prefix_size)
2345 self.assertEqual(f.readline(), u_suffix)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002346
Antoine Pitrou19690592009-06-12 20:14:08 +00002347 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002348 # Regression test for a specific bug
2349 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00002350 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002351 f.write(data)
2352 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002353 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002354 f._CHUNK_SIZE # Just test that it exists
2355 f._CHUNK_SIZE = 2
2356 f.readline()
2357 f.tell()
2358
Antoine Pitrou19690592009-06-12 20:14:08 +00002359 def test_seek_and_tell(self):
2360 #Test seek/tell using the StatefulIncrementalDecoder.
2361 # Make test faster by doing smaller seeks
2362 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00002363
Antoine Pitrou19690592009-06-12 20:14:08 +00002364 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002365 """Tell/seek to various points within a data stream and ensure
2366 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00002367 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002368 f.write(data)
2369 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002370 f = self.open(support.TESTFN, encoding='test_decoder')
2371 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00002372 decoded = f.read()
2373 f.close()
2374
2375 for i in range(min_pos, len(decoded) + 1): # seek positions
2376 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00002377 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti2623a372010-11-21 13:34:58 +00002378 self.assertEqual(f.read(i), decoded[:i])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002379 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002380 self.assertEqual(f.read(j), decoded[i:i + j])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002381 f.seek(cookie)
Ezio Melotti2623a372010-11-21 13:34:58 +00002382 self.assertEqual(f.read(), decoded[i:])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002383 f.close()
2384
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002385 # Enable the test decoder.
2386 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002387
2388 # Run the tests.
2389 try:
2390 # Try each test case.
2391 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002392 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002393
2394 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002395 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2396 offset = CHUNK_SIZE - len(input)//2
2397 prefix = b'.'*offset
2398 # Don't bother seeking into the prefix (takes too long).
2399 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002400 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002401
2402 # Ensure our test decoder won't interfere with subsequent tests.
2403 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002404 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002405
Antoine Pitrou19690592009-06-12 20:14:08 +00002406 def test_encoded_writes(self):
2407 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002408 tests = ("utf-16",
2409 "utf-16-le",
2410 "utf-16-be",
2411 "utf-32",
2412 "utf-32-le",
2413 "utf-32-be")
2414 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002415 buf = self.BytesIO()
2416 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002417 # Check if the BOM is written only once (see issue1753).
2418 f.write(data)
2419 f.write(data)
2420 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002421 self.assertEqual(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002422 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002423 self.assertEqual(f.read(), data * 2)
2424 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002425
Antoine Pitrou19690592009-06-12 20:14:08 +00002426 def test_unreadable(self):
2427 class UnReadable(self.BytesIO):
2428 def readable(self):
2429 return False
2430 txt = self.TextIOWrapper(UnReadable())
2431 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002432
Antoine Pitrou19690592009-06-12 20:14:08 +00002433 def test_read_one_by_one(self):
2434 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002435 reads = ""
2436 while True:
2437 c = txt.read(1)
2438 if not c:
2439 break
2440 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002441 self.assertEqual(reads, "AA\nBB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002442
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002443 def test_readlines(self):
2444 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2445 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2446 txt.seek(0)
2447 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2448 txt.seek(0)
2449 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2450
Christian Heimes1a6387e2008-03-26 12:49:49 +00002451 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002452 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002453 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002454 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002455 reads = ""
2456 while True:
2457 c = txt.read(128)
2458 if not c:
2459 break
2460 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002461 self.assertEqual(reads, "A"*127+"\nB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002462
Antoine Pitroueadca1d2012-10-16 23:02:27 +02002463 def test_writelines(self):
2464 l = ['ab', 'cd', 'ef']
2465 buf = self.BytesIO()
2466 txt = self.TextIOWrapper(buf)
2467 txt.writelines(l)
2468 txt.flush()
2469 self.assertEqual(buf.getvalue(), b'abcdef')
2470
2471 def test_writelines_userlist(self):
2472 l = UserList(['ab', 'cd', 'ef'])
2473 buf = self.BytesIO()
2474 txt = self.TextIOWrapper(buf)
2475 txt.writelines(l)
2476 txt.flush()
2477 self.assertEqual(buf.getvalue(), b'abcdef')
2478
2479 def test_writelines_error(self):
2480 txt = self.TextIOWrapper(self.BytesIO())
2481 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2482 self.assertRaises(TypeError, txt.writelines, None)
2483 self.assertRaises(TypeError, txt.writelines, b'abc')
2484
Christian Heimes1a6387e2008-03-26 12:49:49 +00002485 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002486 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002487
2488 # read one char at a time
2489 reads = ""
2490 while True:
2491 c = txt.read(1)
2492 if not c:
2493 break
2494 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002495 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002496
2497 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002498 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002499 txt._CHUNK_SIZE = 4
2500
2501 reads = ""
2502 while True:
2503 c = txt.read(4)
2504 if not c:
2505 break
2506 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002507 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002508
2509 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002510 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002511 txt._CHUNK_SIZE = 4
2512
2513 reads = txt.read(4)
2514 reads += txt.read(4)
2515 reads += txt.readline()
2516 reads += txt.readline()
2517 reads += txt.readline()
Ezio Melotti2623a372010-11-21 13:34:58 +00002518 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002519
2520 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002521 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002522 txt._CHUNK_SIZE = 4
2523
2524 reads = txt.read(4)
2525 reads += txt.read()
Ezio Melotti2623a372010-11-21 13:34:58 +00002526 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002527
2528 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002529 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002530 txt._CHUNK_SIZE = 4
2531
2532 reads = txt.read(4)
2533 pos = txt.tell()
2534 txt.seek(0)
2535 txt.seek(pos)
Ezio Melotti2623a372010-11-21 13:34:58 +00002536 self.assertEqual(txt.read(4), "BBB\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002537
2538 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002539 buffer = self.BytesIO(self.testdata)
2540 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002541
2542 self.assertEqual(buffer.seekable(), txt.seekable())
2543
Antoine Pitrou19690592009-06-12 20:14:08 +00002544 def test_append_bom(self):
2545 # The BOM is not written again when appending to a non-empty file
2546 filename = support.TESTFN
2547 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2548 with self.open(filename, 'w', encoding=charset) as f:
2549 f.write('aaa')
2550 pos = f.tell()
2551 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002552 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002553
2554 with self.open(filename, 'a', encoding=charset) as f:
2555 f.write('xxx')
2556 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002557 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002558
Antoine Pitrou19690592009-06-12 20:14:08 +00002559 def test_seek_bom(self):
2560 # Same test, but when seeking manually
2561 filename = support.TESTFN
2562 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2563 with self.open(filename, 'w', encoding=charset) as f:
2564 f.write('aaa')
2565 pos = f.tell()
2566 with self.open(filename, 'r+', encoding=charset) as f:
2567 f.seek(pos)
2568 f.write('zzz')
2569 f.seek(0)
2570 f.write('bbb')
2571 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002572 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002573
2574 def test_errors_property(self):
2575 with self.open(support.TESTFN, "w") as f:
2576 self.assertEqual(f.errors, "strict")
2577 with self.open(support.TESTFN, "w", errors="replace") as f:
2578 self.assertEqual(f.errors, "replace")
2579
Victor Stinner6a102812010-04-27 23:55:59 +00002580 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002581 def test_threads_write(self):
2582 # Issue6750: concurrent writes could duplicate data
2583 event = threading.Event()
2584 with self.open(support.TESTFN, "w", buffering=1) as f:
2585 def run(n):
2586 text = "Thread%03d\n" % n
2587 event.wait()
2588 f.write(text)
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03002589 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002590 for x in range(20)]
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03002591 with support.start_threads(threads, event.set):
2592 time.sleep(0.02)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002593 with self.open(support.TESTFN) as f:
2594 content = f.read()
2595 for n in range(20):
Ezio Melotti2623a372010-11-21 13:34:58 +00002596 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002597
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002598 def test_flush_error_on_close(self):
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002599 # Test that text file is closed despite failed flush
2600 # and that flush() is called before file closed.
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002601 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002602 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002603 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002604 closed[:] = [txt.closed, txt.buffer.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002605 raise IOError()
2606 txt.flush = bad_flush
2607 self.assertRaises(IOError, txt.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -06002608 self.assertTrue(txt.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002609 self.assertTrue(txt.buffer.closed)
2610 self.assertTrue(closed) # flush() called
2611 self.assertFalse(closed[0]) # flush() called before file closed
2612 self.assertFalse(closed[1])
Serhiy Storchaka437d5352015-02-23 00:28:38 +02002613 txt.flush = lambda: None # break reference loop
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002614
2615 def test_multi_close(self):
2616 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2617 txt.close()
2618 txt.close()
2619 txt.close()
2620 self.assertRaises(ValueError, txt.flush)
2621
Antoine Pitroufc9ead62010-12-21 21:26:55 +00002622 def test_readonly_attributes(self):
2623 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2624 buf = self.BytesIO(self.testdata)
2625 with self.assertRaises((AttributeError, TypeError)):
2626 txt.buffer = buf
2627
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002628 def test_read_nonbytes(self):
2629 # Issue #17106
2630 # Crash when underlying read() returns non-bytes
2631 class NonbytesStream(self.StringIO):
2632 read1 = self.StringIO.read
2633 class NonbytesStream(self.StringIO):
2634 read1 = self.StringIO.read
2635 t = self.TextIOWrapper(NonbytesStream('a'))
2636 with self.maybeRaises(TypeError):
2637 t.read(1)
2638 t = self.TextIOWrapper(NonbytesStream('a'))
2639 with self.maybeRaises(TypeError):
2640 t.readline()
2641 t = self.TextIOWrapper(NonbytesStream('a'))
2642 self.assertEqual(t.read(), u'a')
2643
2644 def test_illegal_decoder(self):
2645 # Issue #17106
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002646 # Bypass the early encoding check added in issue 20404
2647 def _make_illegal_wrapper():
2648 quopri = codecs.lookup("quopri_codec")
2649 quopri._is_text_encoding = True
2650 try:
2651 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
2652 newline='\n', encoding="quopri_codec")
2653 finally:
2654 quopri._is_text_encoding = False
2655 return t
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002656 # Crash when decoder returns non-string
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002657 with support.check_py3k_warnings():
2658 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2659 encoding='quopri_codec')
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002660 with self.maybeRaises(TypeError):
2661 t.read(1)
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002662 with support.check_py3k_warnings():
2663 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2664 encoding='quopri_codec')
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002665 with self.maybeRaises(TypeError):
2666 t.readline()
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002667 with support.check_py3k_warnings():
2668 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2669 encoding='quopri_codec')
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002670 with self.maybeRaises(TypeError):
2671 t.read()
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002672 #else:
2673 #t = _make_illegal_wrapper()
2674 #self.assertRaises(TypeError, t.read, 1)
2675 #t = _make_illegal_wrapper()
2676 #self.assertRaises(TypeError, t.readline)
2677 #t = _make_illegal_wrapper()
2678 #self.assertRaises(TypeError, t.read)
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002679
2680
Antoine Pitrou19690592009-06-12 20:14:08 +00002681class CTextIOWrapperTest(TextIOWrapperTest):
2682
2683 def test_initialization(self):
2684 r = self.BytesIO(b"\xc3\xa9\n\n")
2685 b = self.BufferedReader(r, 1000)
2686 t = self.TextIOWrapper(b)
2687 self.assertRaises(TypeError, t.__init__, b, newline=42)
2688 self.assertRaises(ValueError, t.read)
2689 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2690 self.assertRaises(ValueError, t.read)
2691
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002692 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2693 self.assertRaises(Exception, repr, t)
2694
Antoine Pitrou19690592009-06-12 20:14:08 +00002695 def test_garbage_collection(self):
2696 # C TextIOWrapper objects are collected, and collecting them flushes
2697 # all data to disk.
2698 # The Python version has __del__, so it ends in gc.garbage instead.
2699 rawio = io.FileIO(support.TESTFN, "wb")
2700 b = self.BufferedWriter(rawio)
2701 t = self.TextIOWrapper(b, encoding="ascii")
2702 t.write("456def")
2703 t.x = t
2704 wr = weakref.ref(t)
2705 del t
2706 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03002707 self.assertIsNone(wr(), wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002708 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002709 self.assertEqual(f.read(), b"456def")
2710
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002711 def test_rwpair_cleared_before_textio(self):
2712 # Issue 13070: TextIOWrapper's finalization would crash when called
2713 # after the reference to the underlying BufferedRWPair's writer got
2714 # cleared by the GC.
2715 for i in range(1000):
2716 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2717 t1 = self.TextIOWrapper(b1, encoding="ascii")
2718 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2719 t2 = self.TextIOWrapper(b2, encoding="ascii")
2720 # circular references
2721 t1.buddy = t2
2722 t2.buddy = t1
2723 support.gc_collect()
2724
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002725 maybeRaises = unittest.TestCase.assertRaises
2726
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002727
Antoine Pitrou19690592009-06-12 20:14:08 +00002728class PyTextIOWrapperTest(TextIOWrapperTest):
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002729 @contextlib.contextmanager
2730 def maybeRaises(self, *args, **kwds):
2731 yield
Antoine Pitrou19690592009-06-12 20:14:08 +00002732
2733
2734class IncrementalNewlineDecoderTest(unittest.TestCase):
2735
2736 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002737 # UTF-8 specific tests for a newline decoder
2738 def _check_decode(b, s, **kwargs):
2739 # We exercise getstate() / setstate() as well as decode()
2740 state = decoder.getstate()
Ezio Melotti2623a372010-11-21 13:34:58 +00002741 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002742 decoder.setstate(state)
Ezio Melotti2623a372010-11-21 13:34:58 +00002743 self.assertEqual(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002744
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002745 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002746
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002747 _check_decode(b'\xe8', "")
2748 _check_decode(b'\xa2', "")
2749 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002750
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002751 _check_decode(b'\xe8', "")
2752 _check_decode(b'\xa2', "")
2753 _check_decode(b'\x88', "\u8888")
2754
2755 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002756 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2757
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002758 decoder.reset()
2759 _check_decode(b'\n', "\n")
2760 _check_decode(b'\r', "")
2761 _check_decode(b'', "\n", final=True)
2762 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002763
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002764 _check_decode(b'\r', "")
2765 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002766
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002767 _check_decode(b'\r\r\n', "\n\n")
2768 _check_decode(b'\r', "")
2769 _check_decode(b'\r', "\n")
2770 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002771
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002772 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2773 _check_decode(b'\xe8\xa2\x88', "\u8888")
2774 _check_decode(b'\n', "\n")
2775 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2776 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002777
Antoine Pitrou19690592009-06-12 20:14:08 +00002778 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002779 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002780 if encoding is not None:
2781 encoder = codecs.getincrementalencoder(encoding)()
2782 def _decode_bytewise(s):
2783 # Decode one byte at a time
2784 for b in encoder.encode(s):
2785 result.append(decoder.decode(b))
2786 else:
2787 encoder = None
2788 def _decode_bytewise(s):
2789 # Decode one char at a time
2790 for c in s:
2791 result.append(decoder.decode(c))
Ezio Melotti2623a372010-11-21 13:34:58 +00002792 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002793 _decode_bytewise("abc\n\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002794 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002795 _decode_bytewise("\nabc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002796 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002797 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002798 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002799 _decode_bytewise("abc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002800 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002801 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002802 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002803 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002804 input = "abc"
2805 if encoder is not None:
2806 encoder.reset()
2807 input = encoder.encode(input)
Ezio Melotti2623a372010-11-21 13:34:58 +00002808 self.assertEqual(decoder.decode(input), "abc")
2809 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002810
2811 def test_newline_decoder(self):
2812 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002813 # None meaning the IncrementalNewlineDecoder takes unicode input
2814 # rather than bytes input
2815 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002816 'utf-16', 'utf-16-le', 'utf-16-be',
2817 'utf-32', 'utf-32-le', 'utf-32-be',
2818 )
2819 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002820 decoder = enc and codecs.getincrementaldecoder(enc)()
2821 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2822 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002823 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002824 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2825 self.check_newline_decoding_utf8(decoder)
2826
2827 def test_newline_bytes(self):
2828 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2829 def _check(dec):
Ezio Melotti2623a372010-11-21 13:34:58 +00002830 self.assertEqual(dec.newlines, None)
2831 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2832 self.assertEqual(dec.newlines, None)
2833 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2834 self.assertEqual(dec.newlines, None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002835 dec = self.IncrementalNewlineDecoder(None, translate=False)
2836 _check(dec)
2837 dec = self.IncrementalNewlineDecoder(None, translate=True)
2838 _check(dec)
2839
2840class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2841 pass
2842
2843class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2844 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002845
Christian Heimes1a6387e2008-03-26 12:49:49 +00002846
2847# XXX Tests for open()
2848
2849class MiscIOTest(unittest.TestCase):
2850
Benjamin Petersonad100c32008-11-20 22:06:22 +00002851 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002852 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002853
Antoine Pitrou19690592009-06-12 20:14:08 +00002854 def test___all__(self):
2855 for name in self.io.__all__:
2856 obj = getattr(self.io, name, None)
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03002857 self.assertIsNotNone(obj, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002858 if name == "open":
2859 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002860 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002861 self.assertTrue(issubclass(obj, Exception), name)
2862 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002863 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002864
Benjamin Petersonad100c32008-11-20 22:06:22 +00002865 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002866 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002867 self.assertEqual(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002868 f.close()
2869
Antoine Pitrou19690592009-06-12 20:14:08 +00002870 f = self.open(support.TESTFN, "U")
Ezio Melotti2623a372010-11-21 13:34:58 +00002871 self.assertEqual(f.name, support.TESTFN)
2872 self.assertEqual(f.buffer.name, support.TESTFN)
2873 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2874 self.assertEqual(f.mode, "U")
2875 self.assertEqual(f.buffer.mode, "rb")
2876 self.assertEqual(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002877 f.close()
2878
Antoine Pitrou19690592009-06-12 20:14:08 +00002879 f = self.open(support.TESTFN, "w+")
Ezio Melotti2623a372010-11-21 13:34:58 +00002880 self.assertEqual(f.mode, "w+")
2881 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2882 self.assertEqual(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002883
Antoine Pitrou19690592009-06-12 20:14:08 +00002884 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti2623a372010-11-21 13:34:58 +00002885 self.assertEqual(g.mode, "wb")
2886 self.assertEqual(g.raw.mode, "wb")
2887 self.assertEqual(g.name, f.fileno())
2888 self.assertEqual(g.raw.name, f.fileno())
Benjamin Petersonad100c32008-11-20 22:06:22 +00002889 f.close()
2890 g.close()
2891
Antoine Pitrou19690592009-06-12 20:14:08 +00002892 def test_io_after_close(self):
2893 for kwargs in [
2894 {"mode": "w"},
2895 {"mode": "wb"},
2896 {"mode": "w", "buffering": 1},
2897 {"mode": "w", "buffering": 2},
2898 {"mode": "wb", "buffering": 0},
2899 {"mode": "r"},
2900 {"mode": "rb"},
2901 {"mode": "r", "buffering": 1},
2902 {"mode": "r", "buffering": 2},
2903 {"mode": "rb", "buffering": 0},
2904 {"mode": "w+"},
2905 {"mode": "w+b"},
2906 {"mode": "w+", "buffering": 1},
2907 {"mode": "w+", "buffering": 2},
2908 {"mode": "w+b", "buffering": 0},
2909 ]:
2910 f = self.open(support.TESTFN, **kwargs)
2911 f.close()
2912 self.assertRaises(ValueError, f.flush)
2913 self.assertRaises(ValueError, f.fileno)
2914 self.assertRaises(ValueError, f.isatty)
2915 self.assertRaises(ValueError, f.__iter__)
2916 if hasattr(f, "peek"):
2917 self.assertRaises(ValueError, f.peek, 1)
2918 self.assertRaises(ValueError, f.read)
2919 if hasattr(f, "read1"):
2920 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinner5100a402011-05-25 22:15:36 +02002921 if hasattr(f, "readall"):
2922 self.assertRaises(ValueError, f.readall)
Antoine Pitrou19690592009-06-12 20:14:08 +00002923 if hasattr(f, "readinto"):
2924 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2925 self.assertRaises(ValueError, f.readline)
2926 self.assertRaises(ValueError, f.readlines)
2927 self.assertRaises(ValueError, f.seek, 0)
2928 self.assertRaises(ValueError, f.tell)
2929 self.assertRaises(ValueError, f.truncate)
2930 self.assertRaises(ValueError, f.write,
2931 b"" if "b" in kwargs['mode'] else "")
2932 self.assertRaises(ValueError, f.writelines, [])
2933 self.assertRaises(ValueError, next, f)
2934
2935 def test_blockingioerror(self):
2936 # Various BlockingIOError issues
2937 self.assertRaises(TypeError, self.BlockingIOError)
2938 self.assertRaises(TypeError, self.BlockingIOError, 1)
2939 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2940 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2941 b = self.BlockingIOError(1, "")
2942 self.assertEqual(b.characters_written, 0)
2943 class C(unicode):
2944 pass
2945 c = C("")
2946 b = self.BlockingIOError(1, c)
2947 c.b = b
2948 b.c = c
2949 wr = weakref.ref(c)
2950 del c, b
2951 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03002952 self.assertIsNone(wr(), wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002953
2954 def test_abcs(self):
2955 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002956 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2957 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2958 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2959 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002960
2961 def _check_abc_inheritance(self, abcmodule):
2962 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002963 self.assertIsInstance(f, abcmodule.IOBase)
2964 self.assertIsInstance(f, abcmodule.RawIOBase)
2965 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2966 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002967 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002968 self.assertIsInstance(f, abcmodule.IOBase)
2969 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2970 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2971 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002972 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002973 self.assertIsInstance(f, abcmodule.IOBase)
2974 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2975 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2976 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002977
2978 def test_abc_inheritance(self):
2979 # Test implementations inherit from their respective ABCs
2980 self._check_abc_inheritance(self)
2981
2982 def test_abc_inheritance_official(self):
2983 # Test implementations inherit from the official ABCs of the
2984 # baseline "io" module.
2985 self._check_abc_inheritance(io)
2986
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01002987 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2988 def test_nonblock_pipe_write_bigbuf(self):
2989 self._test_nonblock_pipe_write(16*1024)
2990
2991 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2992 def test_nonblock_pipe_write_smallbuf(self):
2993 self._test_nonblock_pipe_write(1024)
2994
2995 def _set_non_blocking(self, fd):
2996 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2997 self.assertNotEqual(flags, -1)
2998 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2999 self.assertEqual(res, 0)
3000
3001 def _test_nonblock_pipe_write(self, bufsize):
3002 sent = []
3003 received = []
3004 r, w = os.pipe()
3005 self._set_non_blocking(r)
3006 self._set_non_blocking(w)
3007
3008 # To exercise all code paths in the C implementation we need
3009 # to play with buffer sizes. For instance, if we choose a
3010 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3011 # then we will never get a partial write of the buffer.
3012 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3013 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3014
3015 with rf, wf:
3016 for N in 9999, 73, 7574:
3017 try:
3018 i = 0
3019 while True:
3020 msg = bytes([i % 26 + 97]) * N
3021 sent.append(msg)
3022 wf.write(msg)
3023 i += 1
3024
3025 except self.BlockingIOError as e:
3026 self.assertEqual(e.args[0], errno.EAGAIN)
3027 sent[-1] = sent[-1][:e.characters_written]
3028 received.append(rf.read())
3029 msg = b'BLOCKED'
3030 wf.write(msg)
3031 sent.append(msg)
3032
3033 while True:
3034 try:
3035 wf.flush()
3036 break
3037 except self.BlockingIOError as e:
3038 self.assertEqual(e.args[0], errno.EAGAIN)
3039 self.assertEqual(e.characters_written, 0)
3040 received.append(rf.read())
3041
3042 received += iter(rf.read, None)
3043
3044 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03003045 self.assertEqual(sent, received)
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01003046 self.assertTrue(wf.closed)
3047 self.assertTrue(rf.closed)
3048
Antoine Pitrou19690592009-06-12 20:14:08 +00003049class CMiscIOTest(MiscIOTest):
3050 io = io
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03003051 shutdown_error = "RuntimeError: could not find io module state"
Antoine Pitrou19690592009-06-12 20:14:08 +00003052
3053class PyMiscIOTest(MiscIOTest):
3054 io = pyio
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03003055 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Petersonad100c32008-11-20 22:06:22 +00003056
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003057
3058@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3059class SignalsTest(unittest.TestCase):
3060
3061 def setUp(self):
3062 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3063
3064 def tearDown(self):
3065 signal.signal(signal.SIGALRM, self.oldalrm)
3066
3067 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00003068 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003069
3070 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinner49d495f2011-07-04 11:44:46 +02003071 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
3072 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003073 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3074 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou6439c002011-02-25 21:35:47 +00003075 invokes the signal handler, and bubbles up the exception raised
3076 in the latter."""
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003077 read_results = []
3078 def _read():
3079 s = os.read(r, 1)
3080 read_results.append(s)
3081 t = threading.Thread(target=_read)
3082 t.daemon = True
3083 r, w = os.pipe()
3084 try:
3085 wio = self.io.open(w, **fdopen_kwargs)
3086 t.start()
3087 signal.alarm(1)
3088 # Fill the pipe enough that the write will be blocking.
3089 # It will be interrupted by the timer armed above. Since the
3090 # other thread has read one byte, the low-level write will
3091 # return with a successful (partial) result rather than an EINTR.
3092 # The buffered IO layer must check for pending signal
3093 # handlers, which in this case will invoke alarm_interrupt().
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03003094 try:
3095 with self.assertRaises(ZeroDivisionError):
3096 wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1))
3097 finally:
3098 t.join()
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003099 # We got one byte, get another one and check that it isn't a
3100 # repeat of the first one.
3101 read_results.append(os.read(r, 1))
3102 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3103 finally:
3104 os.close(w)
3105 os.close(r)
3106 # This is deliberate. If we didn't close the file descriptor
3107 # before closing wio, wio would try to flush its internal
3108 # buffer, and block again.
3109 try:
3110 wio.close()
3111 except IOError as e:
3112 if e.errno != errno.EBADF:
3113 raise
3114
3115 def test_interrupted_write_unbuffered(self):
3116 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3117
3118 def test_interrupted_write_buffered(self):
3119 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3120
3121 def test_interrupted_write_text(self):
3122 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3123
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003124 def check_reentrant_write(self, data, **fdopen_kwargs):
3125 def on_alarm(*args):
3126 # Will be called reentrantly from the same thread
3127 wio.write(data)
Victor Stinner4c41f842011-07-05 13:29:26 +02003128 1//0
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003129 signal.signal(signal.SIGALRM, on_alarm)
3130 r, w = os.pipe()
3131 wio = self.io.open(w, **fdopen_kwargs)
3132 try:
3133 signal.alarm(1)
3134 # Either the reentrant call to wio.write() fails with RuntimeError,
3135 # or the signal handler raises ZeroDivisionError.
3136 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3137 while 1:
3138 for i in range(100):
3139 wio.write(data)
3140 wio.flush()
3141 # Make sure the buffer doesn't fill up and block further writes
3142 os.read(r, len(data) * 100)
3143 exc = cm.exception
3144 if isinstance(exc, RuntimeError):
3145 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3146 finally:
3147 wio.close()
3148 os.close(r)
3149
3150 def test_reentrant_write_buffered(self):
3151 self.check_reentrant_write(b"xy", mode="wb")
3152
3153 def test_reentrant_write_text(self):
3154 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3155
Antoine Pitrou6439c002011-02-25 21:35:47 +00003156 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3157 """Check that a buffered read, when it gets interrupted (either
3158 returning a partial result or EINTR), properly invokes the signal
3159 handler and retries if the latter returned successfully."""
3160 r, w = os.pipe()
3161 fdopen_kwargs["closefd"] = False
3162 def alarm_handler(sig, frame):
3163 os.write(w, b"bar")
3164 signal.signal(signal.SIGALRM, alarm_handler)
3165 try:
3166 rio = self.io.open(r, **fdopen_kwargs)
3167 os.write(w, b"foo")
3168 signal.alarm(1)
3169 # Expected behaviour:
3170 # - first raw read() returns partial b"foo"
3171 # - second raw read() returns EINTR
3172 # - third raw read() returns b"bar"
3173 self.assertEqual(decode(rio.read(6)), "foobar")
3174 finally:
3175 rio.close()
3176 os.close(w)
3177 os.close(r)
3178
3179 def test_interrupterd_read_retry_buffered(self):
3180 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3181 mode="rb")
3182
3183 def test_interrupterd_read_retry_text(self):
3184 self.check_interrupted_read_retry(lambda x: x,
3185 mode="r")
3186
3187 @unittest.skipUnless(threading, 'Threading required for this test.')
3188 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3189 """Check that a buffered write, when it gets interrupted (either
3190 returning a partial result or EINTR), properly invokes the signal
3191 handler and retries if the latter returned successfully."""
3192 select = support.import_module("select")
3193 # A quantity that exceeds the buffer size of an anonymous pipe's
3194 # write end.
Antoine Pitrou68915d72013-04-24 23:31:38 +02003195 N = support.PIPE_MAX_SIZE
Antoine Pitrou6439c002011-02-25 21:35:47 +00003196 r, w = os.pipe()
3197 fdopen_kwargs["closefd"] = False
3198 # We need a separate thread to read from the pipe and allow the
3199 # write() to finish. This thread is started after the SIGALRM is
3200 # received (forcing a first EINTR in write()).
3201 read_results = []
3202 write_finished = False
Serhiy Storchaka53ea1622015-03-28 20:38:48 +02003203 error = [None]
Antoine Pitrou6439c002011-02-25 21:35:47 +00003204 def _read():
Serhiy Storchaka53ea1622015-03-28 20:38:48 +02003205 try:
3206 while not write_finished:
3207 while r in select.select([r], [], [], 1.0)[0]:
3208 s = os.read(r, 1024)
3209 read_results.append(s)
3210 except BaseException as exc:
3211 error[0] = exc
Antoine Pitrou6439c002011-02-25 21:35:47 +00003212 t = threading.Thread(target=_read)
3213 t.daemon = True
3214 def alarm1(sig, frame):
3215 signal.signal(signal.SIGALRM, alarm2)
3216 signal.alarm(1)
3217 def alarm2(sig, frame):
3218 t.start()
3219 signal.signal(signal.SIGALRM, alarm1)
3220 try:
3221 wio = self.io.open(w, **fdopen_kwargs)
3222 signal.alarm(1)
3223 # Expected behaviour:
3224 # - first raw write() is partial (because of the limited pipe buffer
3225 # and the first alarm)
3226 # - second raw write() returns EINTR (because of the second alarm)
3227 # - subsequent write()s are successful (either partial or complete)
3228 self.assertEqual(N, wio.write(item * N))
3229 wio.flush()
3230 write_finished = True
3231 t.join()
Serhiy Storchaka53ea1622015-03-28 20:38:48 +02003232
3233 self.assertIsNone(error[0])
Antoine Pitrou6439c002011-02-25 21:35:47 +00003234 self.assertEqual(N, sum(len(x) for x in read_results))
3235 finally:
3236 write_finished = True
3237 os.close(w)
3238 os.close(r)
3239 # This is deliberate. If we didn't close the file descriptor
3240 # before closing wio, wio would try to flush its internal
3241 # buffer, and could block (in case of failure).
3242 try:
3243 wio.close()
3244 except IOError as e:
3245 if e.errno != errno.EBADF:
3246 raise
3247
3248 def test_interrupterd_write_retry_buffered(self):
3249 self.check_interrupted_write_retry(b"x", mode="wb")
3250
3251 def test_interrupterd_write_retry_text(self):
3252 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3253
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003254
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003255class CSignalsTest(SignalsTest):
3256 io = io
3257
3258class PySignalsTest(SignalsTest):
3259 io = pyio
3260
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003261 # Handling reentrancy issues would slow down _pyio even more, so the
3262 # tests are disabled.
3263 test_reentrant_write_buffered = None
3264 test_reentrant_write_text = None
3265
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003266
Christian Heimes1a6387e2008-03-26 12:49:49 +00003267def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00003268 tests = (CIOTest, PyIOTest,
3269 CBufferedReaderTest, PyBufferedReaderTest,
3270 CBufferedWriterTest, PyBufferedWriterTest,
3271 CBufferedRWPairTest, PyBufferedRWPairTest,
3272 CBufferedRandomTest, PyBufferedRandomTest,
3273 StatefulIncrementalDecoderTest,
3274 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3275 CTextIOWrapperTest, PyTextIOWrapperTest,
3276 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003277 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00003278 )
3279
3280 # Put the namespaces of the IO module we are testing and some useful mock
3281 # classes in the __dict__ of each test.
3282 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00003283 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00003284 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3285 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
3286 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
3287 globs = globals()
3288 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3289 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3290 # Avoid turning open into a bound method.
3291 py_io_ns["open"] = pyio.OpenWrapper
3292 for test in tests:
3293 if test.__name__.startswith("C"):
3294 for name, obj in c_io_ns.items():
3295 setattr(test, name, obj)
3296 elif test.__name__.startswith("Py"):
3297 for name, obj in py_io_ns.items():
3298 setattr(test, name, obj)
3299
3300 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00003301
3302if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00003303 test_main()