blob: 2914a805b5272dc827780da9eeeca72b27644778 [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Serhiy Storchaka05b0a1b2014-06-09 13:32:08 +030032import warnings
Antoine Pitrou19690592009-06-12 20:14:08 +000033import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000034import signal
35import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000036from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000037from collections import deque
Antoine Pitrou78e761e2012-10-16 22:57:11 +020038from UserList import UserList
Antoine Pitrou19690592009-06-12 20:14:08 +000039from test import test_support as support
Serhiy Storchaka354d50e2013-02-03 17:10:42 +020040import contextlib
Christian Heimes1a6387e2008-03-26 12:49:49 +000041
42import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000043import io # C implementation of io
44import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000045try:
46 import threading
47except ImportError:
48 threading = None
Antoine Pitrou5aa7df32011-11-21 20:16:44 +010049try:
50 import fcntl
51except ImportError:
52 fcntl = None
Antoine Pitrou19690592009-06-12 20:14:08 +000053
54__metaclass__ = type
55bytes = support.py3k_bytes
56
57def _default_chunk_size():
58 """Get the default TextIOWrapper chunk size"""
59 with io.open(__file__, "r", encoding="latin1") as f:
60 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000061
62
Antoine Pitrou6391b342010-09-14 18:48:19 +000063class MockRawIOWithoutRead:
64 """A RawIO implementation without read(), so as to exercise the default
65 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000066
67 def __init__(self, read_stack=()):
68 self._read_stack = list(read_stack)
69 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000070 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000071 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000072
Christian Heimes1a6387e2008-03-26 12:49:49 +000073 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000074 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000075 return len(b)
76
77 def writable(self):
78 return True
79
80 def fileno(self):
81 return 42
82
83 def readable(self):
84 return True
85
86 def seekable(self):
87 return True
88
89 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000090 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000091
92 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000093 return 0 # same comment as above
94
95 def readinto(self, buf):
96 self._reads += 1
97 max_len = len(buf)
98 try:
99 data = self._read_stack[0]
100 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000101 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +0000102 return 0
103 if data is None:
104 del self._read_stack[0]
105 return None
106 n = len(data)
107 if len(data) <= max_len:
108 del self._read_stack[0]
109 buf[:n] = data
110 return n
111 else:
112 buf[:] = data[:max_len]
113 self._read_stack[0] = data[max_len:]
114 return max_len
115
116 def truncate(self, pos=None):
117 return pos
118
Antoine Pitrou6391b342010-09-14 18:48:19 +0000119class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
120 pass
121
122class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
123 pass
124
125
126class MockRawIO(MockRawIOWithoutRead):
127
128 def read(self, n=None):
129 self._reads += 1
130 try:
131 return self._read_stack.pop(0)
132 except:
133 self._extraneous_reads += 1
134 return b""
135
Antoine Pitrou19690592009-06-12 20:14:08 +0000136class CMockRawIO(MockRawIO, io.RawIOBase):
137 pass
138
139class PyMockRawIO(MockRawIO, pyio.RawIOBase):
140 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000141
142
Antoine Pitrou19690592009-06-12 20:14:08 +0000143class MisbehavedRawIO(MockRawIO):
144 def write(self, b):
145 return MockRawIO.write(self, b) * 2
146
147 def read(self, n=None):
148 return MockRawIO.read(self, n) * 2
149
150 def seek(self, pos, whence):
151 return -123
152
153 def tell(self):
154 return -456
155
156 def readinto(self, buf):
157 MockRawIO.readinto(self, buf)
158 return len(buf) * 5
159
160class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
161 pass
162
163class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
164 pass
165
166
167class CloseFailureIO(MockRawIO):
168 closed = 0
169
170 def close(self):
171 if not self.closed:
172 self.closed = 1
173 raise IOError
174
175class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
176 pass
177
178class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
179 pass
180
181
182class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000183
184 def __init__(self, data):
185 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000186 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000187
188 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000189 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000190 self.read_history.append(None if res is None else len(res))
191 return res
192
Antoine Pitrou19690592009-06-12 20:14:08 +0000193 def readinto(self, b):
194 res = super(MockFileIO, self).readinto(b)
195 self.read_history.append(res)
196 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000197
Antoine Pitrou19690592009-06-12 20:14:08 +0000198class CMockFileIO(MockFileIO, io.BytesIO):
199 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000200
Antoine Pitrou19690592009-06-12 20:14:08 +0000201class PyMockFileIO(MockFileIO, pyio.BytesIO):
202 pass
203
204
205class MockNonBlockWriterIO:
206
207 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000208 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000209 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000210
Antoine Pitrou19690592009-06-12 20:14:08 +0000211 def pop_written(self):
212 s = b"".join(self._write_stack)
213 self._write_stack[:] = []
214 return s
215
216 def block_on(self, char):
217 """Block when a given char is encountered."""
218 self._blocker_char = char
219
220 def readable(self):
221 return True
222
223 def seekable(self):
224 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000225
226 def writable(self):
227 return True
228
Antoine Pitrou19690592009-06-12 20:14:08 +0000229 def write(self, b):
230 b = bytes(b)
231 n = -1
232 if self._blocker_char:
233 try:
234 n = b.index(self._blocker_char)
235 except ValueError:
236 pass
237 else:
Antoine Pitrou5aa7df32011-11-21 20:16:44 +0100238 if n > 0:
239 # write data up to the first blocker
240 self._write_stack.append(b[:n])
241 return n
242 else:
243 # cancel blocker and indicate would block
244 self._blocker_char = None
245 return None
Antoine Pitrou19690592009-06-12 20:14:08 +0000246 self._write_stack.append(b)
247 return len(b)
248
249class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
250 BlockingIOError = io.BlockingIOError
251
252class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
253 BlockingIOError = pyio.BlockingIOError
254
Christian Heimes1a6387e2008-03-26 12:49:49 +0000255
256class IOTest(unittest.TestCase):
257
Antoine Pitrou19690592009-06-12 20:14:08 +0000258 def setUp(self):
259 support.unlink(support.TESTFN)
260
Christian Heimes1a6387e2008-03-26 12:49:49 +0000261 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000262 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000263
264 def write_ops(self, f):
265 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000266 f.truncate(0)
267 self.assertEqual(f.tell(), 5)
268 f.seek(0)
269
270 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000271 self.assertEqual(f.seek(0), 0)
272 self.assertEqual(f.write(b"Hello."), 6)
273 self.assertEqual(f.tell(), 6)
274 self.assertEqual(f.seek(-1, 1), 5)
275 self.assertEqual(f.tell(), 5)
276 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
277 self.assertEqual(f.seek(0), 0)
278 self.assertEqual(f.write(b"h"), 1)
279 self.assertEqual(f.seek(-1, 2), 13)
280 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000281
Christian Heimes1a6387e2008-03-26 12:49:49 +0000282 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000283 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000284 self.assertRaises(TypeError, f.seek, 0.0)
285
286 def read_ops(self, f, buffered=False):
287 data = f.read(5)
288 self.assertEqual(data, b"hello")
289 data = bytearray(data)
290 self.assertEqual(f.readinto(data), 5)
291 self.assertEqual(data, b" worl")
292 self.assertEqual(f.readinto(data), 2)
293 self.assertEqual(len(data), 5)
294 self.assertEqual(data[:2], b"d\n")
295 self.assertEqual(f.seek(0), 0)
296 self.assertEqual(f.read(20), b"hello world\n")
297 self.assertEqual(f.read(1), b"")
298 self.assertEqual(f.readinto(bytearray(b"x")), 0)
299 self.assertEqual(f.seek(-6, 2), 6)
300 self.assertEqual(f.read(5), b"world")
301 self.assertEqual(f.read(0), b"")
302 self.assertEqual(f.readinto(bytearray()), 0)
303 self.assertEqual(f.seek(-6, 1), 5)
304 self.assertEqual(f.read(5), b" worl")
305 self.assertEqual(f.tell(), 10)
306 self.assertRaises(TypeError, f.seek, 0.0)
307 if buffered:
308 f.seek(0)
309 self.assertEqual(f.read(), b"hello world\n")
310 f.seek(6)
311 self.assertEqual(f.read(), b"world\n")
312 self.assertEqual(f.read(), b"")
313
314 LARGE = 2**31
315
316 def large_file_ops(self, f):
317 assert f.readable()
318 assert f.writable()
319 self.assertEqual(f.seek(self.LARGE), self.LARGE)
320 self.assertEqual(f.tell(), self.LARGE)
321 self.assertEqual(f.write(b"xxx"), 3)
322 self.assertEqual(f.tell(), self.LARGE + 3)
323 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
324 self.assertEqual(f.truncate(), self.LARGE + 2)
325 self.assertEqual(f.tell(), self.LARGE + 2)
326 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
327 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000328 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000329 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
330 self.assertEqual(f.seek(-1, 2), self.LARGE)
331 self.assertEqual(f.read(2), b"x")
332
Antoine Pitrou19690592009-06-12 20:14:08 +0000333 def test_invalid_operations(self):
334 # Try writing on a file opened in read mode and vice-versa.
335 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000336 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000337 self.assertRaises(IOError, fp.read)
338 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000339 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000340 self.assertRaises(IOError, fp.write, b"blah")
341 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000342 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000343 self.assertRaises(IOError, fp.write, "blah")
344 self.assertRaises(IOError, fp.writelines, ["blah\n"])
345
Christian Heimes1a6387e2008-03-26 12:49:49 +0000346 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000347 with self.open(support.TESTFN, "wb", buffering=0) as f:
348 self.assertEqual(f.readable(), False)
349 self.assertEqual(f.writable(), True)
350 self.assertEqual(f.seekable(), True)
351 self.write_ops(f)
352 with self.open(support.TESTFN, "rb", buffering=0) as f:
353 self.assertEqual(f.readable(), True)
354 self.assertEqual(f.writable(), False)
355 self.assertEqual(f.seekable(), True)
356 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000357
358 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000359 with self.open(support.TESTFN, "wb") as f:
360 self.assertEqual(f.readable(), False)
361 self.assertEqual(f.writable(), True)
362 self.assertEqual(f.seekable(), True)
363 self.write_ops(f)
364 with self.open(support.TESTFN, "rb") as f:
365 self.assertEqual(f.readable(), True)
366 self.assertEqual(f.writable(), False)
367 self.assertEqual(f.seekable(), True)
368 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000369
370 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000371 with self.open(support.TESTFN, "wb") as f:
372 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
373 with self.open(support.TESTFN, "rb") as f:
374 self.assertEqual(f.readline(), b"abc\n")
375 self.assertEqual(f.readline(10), b"def\n")
376 self.assertEqual(f.readline(2), b"xy")
377 self.assertEqual(f.readline(4), b"zzy\n")
378 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000379 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000380 self.assertRaises(TypeError, f.readline, 5.3)
381 with self.open(support.TESTFN, "r") as f:
382 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000383
384 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000385 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000386 self.write_ops(f)
387 data = f.getvalue()
388 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000389 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000390 self.read_ops(f, True)
391
392 def test_large_file_ops(self):
393 # On Windows and Mac OSX this test comsumes large resources; It takes
394 # a long time to build the >2GB file and takes >2GB of disk space
395 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000396 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware1f702212013-12-10 14:09:20 -0600397 support.requires(
398 'largefile',
399 'test requires %s bytes and a long time to run' % self.LARGE)
Antoine Pitrou19690592009-06-12 20:14:08 +0000400 with self.open(support.TESTFN, "w+b", 0) as f:
401 self.large_file_ops(f)
402 with self.open(support.TESTFN, "w+b") as f:
403 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000404
405 def test_with_open(self):
406 for bufsize in (0, 1, 100):
407 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000408 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000409 f.write(b"xxx")
410 self.assertEqual(f.closed, True)
411 f = None
412 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000413 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000414 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000415 except ZeroDivisionError:
416 self.assertEqual(f.closed, True)
417 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000418 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000419
Antoine Pitroue741cc62009-01-21 00:45:36 +0000420 # issue 5008
421 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000422 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000423 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000424 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000425 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000426 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000427 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000428 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000429 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000430
Christian Heimes1a6387e2008-03-26 12:49:49 +0000431 def test_destructor(self):
432 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000433 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000434 def __del__(self):
435 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000436 try:
437 f = super(MyFileIO, self).__del__
438 except AttributeError:
439 pass
440 else:
441 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000442 def close(self):
443 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000444 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000445 def flush(self):
446 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000447 super(MyFileIO, self).flush()
448 f = MyFileIO(support.TESTFN, "wb")
449 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000450 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000451 support.gc_collect()
452 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000453 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000454 self.assertEqual(f.read(), b"xxx")
455
456 def _check_base_destructor(self, base):
457 record = []
458 class MyIO(base):
459 def __init__(self):
460 # This exercises the availability of attributes on object
461 # destruction.
462 # (in the C version, close() is called by the tp_dealloc
463 # function, not by __del__)
464 self.on_del = 1
465 self.on_close = 2
466 self.on_flush = 3
467 def __del__(self):
468 record.append(self.on_del)
469 try:
470 f = super(MyIO, self).__del__
471 except AttributeError:
472 pass
473 else:
474 f()
475 def close(self):
476 record.append(self.on_close)
477 super(MyIO, self).close()
478 def flush(self):
479 record.append(self.on_flush)
480 super(MyIO, self).flush()
481 f = MyIO()
482 del f
483 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000484 self.assertEqual(record, [1, 2, 3])
485
Antoine Pitrou19690592009-06-12 20:14:08 +0000486 def test_IOBase_destructor(self):
487 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000488
Antoine Pitrou19690592009-06-12 20:14:08 +0000489 def test_RawIOBase_destructor(self):
490 self._check_base_destructor(self.RawIOBase)
491
492 def test_BufferedIOBase_destructor(self):
493 self._check_base_destructor(self.BufferedIOBase)
494
495 def test_TextIOBase_destructor(self):
496 self._check_base_destructor(self.TextIOBase)
497
498 def test_close_flushes(self):
499 with self.open(support.TESTFN, "wb") as f:
500 f.write(b"xxx")
501 with self.open(support.TESTFN, "rb") as f:
502 self.assertEqual(f.read(), b"xxx")
503
504 def test_array_writes(self):
505 a = array.array(b'i', range(10))
506 n = len(a.tostring())
507 with self.open(support.TESTFN, "wb", 0) as f:
508 self.assertEqual(f.write(a), n)
509 with self.open(support.TESTFN, "wb") as f:
510 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000511
512 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000513 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000514 closefd=False)
515
Antoine Pitrou19690592009-06-12 20:14:08 +0000516 def test_read_closed(self):
517 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000518 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000519 with self.open(support.TESTFN, "r") as f:
520 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000521 self.assertEqual(file.read(), "egg\n")
522 file.seek(0)
523 file.close()
524 self.assertRaises(ValueError, file.read)
525
526 def test_no_closefd_with_filename(self):
527 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000528 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000529
530 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000531 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000532 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000533 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000534 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000535 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000536 self.assertEqual(file.buffer.raw.closefd, False)
537
Antoine Pitrou19690592009-06-12 20:14:08 +0000538 def test_garbage_collection(self):
539 # FileIO objects are collected, and collecting them flushes
540 # all data to disk.
541 f = self.FileIO(support.TESTFN, "wb")
542 f.write(b"abcxxx")
543 f.f = f
544 wr = weakref.ref(f)
545 del f
546 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000547 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000548 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000549 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000550
Antoine Pitrou19690592009-06-12 20:14:08 +0000551 def test_unbounded_file(self):
552 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
553 zero = "/dev/zero"
554 if not os.path.exists(zero):
555 self.skipTest("{0} does not exist".format(zero))
556 if sys.maxsize > 0x7FFFFFFF:
557 self.skipTest("test can only run in a 32-bit address space")
558 if support.real_max_memuse < support._2G:
559 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000560 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000561 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000562 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000563 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000564 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000565 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000566
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200567 def check_flush_error_on_close(self, *args, **kwargs):
568 # Test that the file is closed despite failed flush
569 # and that flush() is called before file closed.
570 f = self.open(*args, **kwargs)
571 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000572 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200573 closed[:] = [f.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000574 raise IOError()
575 f.flush = bad_flush
576 self.assertRaises(IOError, f.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600577 self.assertTrue(f.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200578 self.assertTrue(closed) # flush() called
579 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchaka437d5352015-02-23 00:28:38 +0200580 f.flush = lambda: None # break reference loop
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200581
582 def test_flush_error_on_close(self):
583 # raw file
584 # Issue #5700: io.FileIO calls flush() after file closed
585 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
586 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
587 self.check_flush_error_on_close(fd, 'wb', buffering=0)
588 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
589 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
590 os.close(fd)
591 # buffered io
592 self.check_flush_error_on_close(support.TESTFN, 'wb')
593 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
594 self.check_flush_error_on_close(fd, 'wb')
595 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
596 self.check_flush_error_on_close(fd, 'wb', closefd=False)
597 os.close(fd)
598 # text io
599 self.check_flush_error_on_close(support.TESTFN, 'w')
600 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
601 self.check_flush_error_on_close(fd, 'w')
602 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
603 self.check_flush_error_on_close(fd, 'w', closefd=False)
604 os.close(fd)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000605
606 def test_multi_close(self):
607 f = self.open(support.TESTFN, "wb", buffering=0)
608 f.close()
609 f.close()
610 f.close()
611 self.assertRaises(ValueError, f.flush)
612
Antoine Pitrou6391b342010-09-14 18:48:19 +0000613 def test_RawIOBase_read(self):
614 # Exercise the default RawIOBase.read() implementation (which calls
615 # readinto() internally).
616 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
617 self.assertEqual(rawio.read(2), b"ab")
618 self.assertEqual(rawio.read(2), b"c")
619 self.assertEqual(rawio.read(2), b"d")
620 self.assertEqual(rawio.read(2), None)
621 self.assertEqual(rawio.read(2), b"ef")
622 self.assertEqual(rawio.read(2), b"g")
623 self.assertEqual(rawio.read(2), None)
624 self.assertEqual(rawio.read(2), b"")
625
Hynek Schlawack877effc2012-05-25 09:24:18 +0200626 def test_fileio_closefd(self):
627 # Issue #4841
628 with self.open(__file__, 'rb') as f1, \
629 self.open(__file__, 'rb') as f2:
630 fileio = self.FileIO(f1.fileno(), closefd=False)
631 # .__init__() must not close f1
632 fileio.__init__(f2.fileno(), closefd=False)
633 f1.readline()
634 # .close() must not close f2
635 fileio.close()
636 f2.readline()
637
Serhiy Storchaka05b0a1b2014-06-09 13:32:08 +0300638 def test_nonbuffered_textio(self):
639 with warnings.catch_warnings(record=True) as recorded:
640 with self.assertRaises(ValueError):
641 self.open(support.TESTFN, 'w', buffering=0)
642 support.gc_collect()
643 self.assertEqual(recorded, [])
644
645 def test_invalid_newline(self):
646 with warnings.catch_warnings(record=True) as recorded:
647 with self.assertRaises(ValueError):
648 self.open(support.TESTFN, 'w', newline='invalid')
649 support.gc_collect()
650 self.assertEqual(recorded, [])
651
Hynek Schlawack877effc2012-05-25 09:24:18 +0200652
Antoine Pitrou19690592009-06-12 20:14:08 +0000653class CIOTest(IOTest):
Antoine Pitrou16166452011-07-12 22:04:20 +0200654
655 def test_IOBase_finalize(self):
656 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
657 # class which inherits IOBase and an object of this class are caught
658 # in a reference cycle and close() is already in the method cache.
659 class MyIO(self.IOBase):
660 def close(self):
661 pass
662
663 # create an instance to populate the method cache
664 MyIO()
665 obj = MyIO()
666 obj.obj = obj
667 wr = weakref.ref(obj)
668 del MyIO
669 del obj
670 support.gc_collect()
671 self.assertTrue(wr() is None, wr)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000672
Antoine Pitrou19690592009-06-12 20:14:08 +0000673class PyIOTest(IOTest):
674 test_array_writes = unittest.skip(
675 "len(array.array) returns number of elements rather than bytelength"
676 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000677
678
Antoine Pitrou19690592009-06-12 20:14:08 +0000679class CommonBufferedTests:
680 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
681
682 def test_detach(self):
683 raw = self.MockRawIO()
684 buf = self.tp(raw)
685 self.assertIs(buf.detach(), raw)
686 self.assertRaises(ValueError, buf.detach)
687
Benjamin Peterson53ae6142014-12-21 20:51:50 -0600688 repr(buf) # Should still work
689
Antoine Pitrou19690592009-06-12 20:14:08 +0000690 def test_fileno(self):
691 rawio = self.MockRawIO()
692 bufio = self.tp(rawio)
693
Ezio Melotti2623a372010-11-21 13:34:58 +0000694 self.assertEqual(42, bufio.fileno())
Antoine Pitrou19690592009-06-12 20:14:08 +0000695
Zachary Ware1f702212013-12-10 14:09:20 -0600696 @unittest.skip('test having existential crisis')
Antoine Pitrou19690592009-06-12 20:14:08 +0000697 def test_no_fileno(self):
698 # XXX will we always have fileno() function? If so, kill
699 # this test. Else, write it.
700 pass
701
702 def test_invalid_args(self):
703 rawio = self.MockRawIO()
704 bufio = self.tp(rawio)
705 # Invalid whence
706 self.assertRaises(ValueError, bufio.seek, 0, -1)
707 self.assertRaises(ValueError, bufio.seek, 0, 3)
708
709 def test_override_destructor(self):
710 tp = self.tp
711 record = []
712 class MyBufferedIO(tp):
713 def __del__(self):
714 record.append(1)
715 try:
716 f = super(MyBufferedIO, self).__del__
717 except AttributeError:
718 pass
719 else:
720 f()
721 def close(self):
722 record.append(2)
723 super(MyBufferedIO, self).close()
724 def flush(self):
725 record.append(3)
726 super(MyBufferedIO, self).flush()
727 rawio = self.MockRawIO()
728 bufio = MyBufferedIO(rawio)
729 writable = bufio.writable()
730 del bufio
731 support.gc_collect()
732 if writable:
733 self.assertEqual(record, [1, 2, 3])
734 else:
735 self.assertEqual(record, [1, 2])
736
737 def test_context_manager(self):
738 # Test usability as a context manager
739 rawio = self.MockRawIO()
740 bufio = self.tp(rawio)
741 def _with():
742 with bufio:
743 pass
744 _with()
745 # bufio should now be closed, and using it a second time should raise
746 # a ValueError.
747 self.assertRaises(ValueError, _with)
748
749 def test_error_through_destructor(self):
750 # Test that the exception state is not modified by a destructor,
751 # even if close() fails.
752 rawio = self.CloseFailureIO()
753 def f():
754 self.tp(rawio).xyzzy
755 with support.captured_output("stderr") as s:
756 self.assertRaises(AttributeError, f)
757 s = s.getvalue().strip()
758 if s:
759 # The destructor *may* have printed an unraisable error, check it
760 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000761 self.assertTrue(s.startswith("Exception IOError: "), s)
762 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000763
764 def test_repr(self):
765 raw = self.MockRawIO()
766 b = self.tp(raw)
767 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
768 self.assertEqual(repr(b), "<%s>" % clsname)
769 raw.name = "dummy"
770 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
771 raw.name = b"dummy"
772 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000773
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000774 def test_flush_error_on_close(self):
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200775 # Test that buffered file is closed despite failed flush
776 # and that flush() is called before file closed.
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000777 raw = self.MockRawIO()
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200778 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000779 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200780 closed[:] = [b.closed, raw.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000781 raise IOError()
782 raw.flush = bad_flush
783 b = self.tp(raw)
784 self.assertRaises(IOError, b.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600785 self.assertTrue(b.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200786 self.assertTrue(raw.closed)
787 self.assertTrue(closed) # flush() called
788 self.assertFalse(closed[0]) # flush() called before file closed
789 self.assertFalse(closed[1])
Serhiy Storchaka437d5352015-02-23 00:28:38 +0200790 raw.flush = lambda: None # break reference loop
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600791
792 def test_close_error_on_close(self):
793 raw = self.MockRawIO()
794 def bad_flush():
795 raise IOError('flush')
796 def bad_close():
797 raise IOError('close')
798 raw.close = bad_close
799 b = self.tp(raw)
800 b.flush = bad_flush
801 with self.assertRaises(IOError) as err: # exception not swallowed
802 b.close()
803 self.assertEqual(err.exception.args, ('close',))
804 self.assertFalse(b.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000805
806 def test_multi_close(self):
807 raw = self.MockRawIO()
808 b = self.tp(raw)
809 b.close()
810 b.close()
811 b.close()
812 self.assertRaises(ValueError, b.flush)
813
Antoine Pitroufc9ead62010-12-21 21:26:55 +0000814 def test_readonly_attributes(self):
815 raw = self.MockRawIO()
816 buf = self.tp(raw)
817 x = self.MockRawIO()
818 with self.assertRaises((AttributeError, TypeError)):
819 buf.raw = x
820
Christian Heimes1a6387e2008-03-26 12:49:49 +0000821
Antoine Pitroubff5df02012-07-29 19:02:46 +0200822class SizeofTest:
823
824 @support.cpython_only
825 def test_sizeof(self):
826 bufsize1 = 4096
827 bufsize2 = 8192
828 rawio = self.MockRawIO()
829 bufio = self.tp(rawio, buffer_size=bufsize1)
830 size = sys.getsizeof(bufio) - bufsize1
831 rawio = self.MockRawIO()
832 bufio = self.tp(rawio, buffer_size=bufsize2)
833 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
834
835
Antoine Pitrou19690592009-06-12 20:14:08 +0000836class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
837 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000838
Antoine Pitrou19690592009-06-12 20:14:08 +0000839 def test_constructor(self):
840 rawio = self.MockRawIO([b"abc"])
841 bufio = self.tp(rawio)
842 bufio.__init__(rawio)
843 bufio.__init__(rawio, buffer_size=1024)
844 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000845 self.assertEqual(b"abc", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000846 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
847 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
848 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
849 rawio = self.MockRawIO([b"abc"])
850 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000851 self.assertEqual(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000852
Serhiy Storchaka1d19f972014-02-12 10:52:07 +0200853 def test_uninitialized(self):
854 bufio = self.tp.__new__(self.tp)
855 del bufio
856 bufio = self.tp.__new__(self.tp)
857 self.assertRaisesRegexp((ValueError, AttributeError),
858 'uninitialized|has no attribute',
859 bufio.read, 0)
860 bufio.__init__(self.MockRawIO())
861 self.assertEqual(bufio.read(0), b'')
862
Antoine Pitrou19690592009-06-12 20:14:08 +0000863 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000864 for arg in (None, 7):
865 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
866 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000867 self.assertEqual(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000868 # Invalid args
869 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000870
Antoine Pitrou19690592009-06-12 20:14:08 +0000871 def test_read1(self):
872 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
873 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000874 self.assertEqual(b"a", bufio.read(1))
875 self.assertEqual(b"b", bufio.read1(1))
876 self.assertEqual(rawio._reads, 1)
877 self.assertEqual(b"c", bufio.read1(100))
878 self.assertEqual(rawio._reads, 1)
879 self.assertEqual(b"d", bufio.read1(100))
880 self.assertEqual(rawio._reads, 2)
881 self.assertEqual(b"efg", bufio.read1(100))
882 self.assertEqual(rawio._reads, 3)
883 self.assertEqual(b"", bufio.read1(100))
884 self.assertEqual(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000885 # Invalid args
886 self.assertRaises(ValueError, bufio.read1, -1)
887
888 def test_readinto(self):
889 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
890 bufio = self.tp(rawio)
891 b = bytearray(2)
Ezio Melotti2623a372010-11-21 13:34:58 +0000892 self.assertEqual(bufio.readinto(b), 2)
893 self.assertEqual(b, b"ab")
894 self.assertEqual(bufio.readinto(b), 2)
895 self.assertEqual(b, b"cd")
896 self.assertEqual(bufio.readinto(b), 2)
897 self.assertEqual(b, b"ef")
898 self.assertEqual(bufio.readinto(b), 1)
899 self.assertEqual(b, b"gf")
900 self.assertEqual(bufio.readinto(b), 0)
901 self.assertEqual(b, b"gf")
Antoine Pitrou19690592009-06-12 20:14:08 +0000902
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000903 def test_readlines(self):
904 def bufio():
905 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
906 return self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000907 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
908 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
909 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000910
Antoine Pitrou19690592009-06-12 20:14:08 +0000911 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000912 data = b"abcdefghi"
913 dlen = len(data)
914
915 tests = [
916 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
917 [ 100, [ 3, 3, 3], [ dlen ] ],
918 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
919 ]
920
921 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000922 rawio = self.MockFileIO(data)
923 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000924 pos = 0
925 for nbytes in buf_read_sizes:
Ezio Melotti2623a372010-11-21 13:34:58 +0000926 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Christian Heimes1a6387e2008-03-26 12:49:49 +0000927 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000928 # this is mildly implementation-dependent
Ezio Melotti2623a372010-11-21 13:34:58 +0000929 self.assertEqual(rawio.read_history, raw_read_sizes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000930
Antoine Pitrou19690592009-06-12 20:14:08 +0000931 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000932 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000933 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
934 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000935 self.assertEqual(b"abcd", bufio.read(6))
936 self.assertEqual(b"e", bufio.read(1))
937 self.assertEqual(b"fg", bufio.read())
938 self.assertEqual(b"", bufio.peek(1))
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200939 self.assertIsNone(bufio.read())
Ezio Melotti2623a372010-11-21 13:34:58 +0000940 self.assertEqual(b"", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000941
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200942 rawio = self.MockRawIO((b"a", None, None))
943 self.assertEqual(b"a", rawio.readall())
944 self.assertIsNone(rawio.readall())
945
Antoine Pitrou19690592009-06-12 20:14:08 +0000946 def test_read_past_eof(self):
947 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
948 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000949
Ezio Melotti2623a372010-11-21 13:34:58 +0000950 self.assertEqual(b"abcdefg", bufio.read(9000))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000951
Antoine Pitrou19690592009-06-12 20:14:08 +0000952 def test_read_all(self):
953 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
954 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000955
Ezio Melotti2623a372010-11-21 13:34:58 +0000956 self.assertEqual(b"abcdefg", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000957
Victor Stinner6a102812010-04-27 23:55:59 +0000958 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +0000959 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +0000960 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000961 try:
962 # Write out many bytes with exactly the same number of 0's,
963 # 1's... 255's. This will help us check that concurrent reading
964 # doesn't duplicate or forget contents.
965 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000966 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000967 random.shuffle(l)
968 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000969 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000970 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000971 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000972 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000973 errors = []
974 results = []
975 def f():
976 try:
977 # Intra-buffer read then buffer-flushing read
978 for n in cycle([1, 19]):
979 s = bufio.read(n)
980 if not s:
981 break
982 # list.append() is atomic
983 results.append(s)
984 except Exception as e:
985 errors.append(e)
986 raise
987 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchakabd8c6292015-04-01 12:56:39 +0300988 with support.start_threads(threads):
989 time.sleep(0.02) # yield
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000990 self.assertFalse(errors,
991 "the following exceptions were caught: %r" % errors)
992 s = b''.join(results)
993 for i in range(256):
994 c = bytes(bytearray([i]))
995 self.assertEqual(s.count(c), N)
996 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000997 support.unlink(support.TESTFN)
998
999 def test_misbehaved_io(self):
1000 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1001 bufio = self.tp(rawio)
1002 self.assertRaises(IOError, bufio.seek, 0)
1003 self.assertRaises(IOError, bufio.tell)
1004
Antoine Pitroucb4f47c2010-08-11 13:40:17 +00001005 def test_no_extraneous_read(self):
1006 # Issue #9550; when the raw IO object has satisfied the read request,
1007 # we should not issue any additional reads, otherwise it may block
1008 # (e.g. socket).
1009 bufsize = 16
1010 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1011 rawio = self.MockRawIO([b"x" * n])
1012 bufio = self.tp(rawio, bufsize)
1013 self.assertEqual(bufio.read(n), b"x" * n)
1014 # Simple case: one raw read is enough to satisfy the request.
1015 self.assertEqual(rawio._extraneous_reads, 0,
1016 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1017 # A more complex case where two raw reads are needed to satisfy
1018 # the request.
1019 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1020 bufio = self.tp(rawio, bufsize)
1021 self.assertEqual(bufio.read(n), b"x" * n)
1022 self.assertEqual(rawio._extraneous_reads, 0,
1023 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1024
1025
Antoine Pitroubff5df02012-07-29 19:02:46 +02001026class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001027 tp = io.BufferedReader
1028
1029 def test_constructor(self):
1030 BufferedReaderTest.test_constructor(self)
1031 # The allocation can succeed on 32-bit builds, e.g. with more
1032 # than 2GB RAM and a 64-bit kernel.
1033 if sys.maxsize > 0x7FFFFFFF:
1034 rawio = self.MockRawIO()
1035 bufio = self.tp(rawio)
1036 self.assertRaises((OverflowError, MemoryError, ValueError),
1037 bufio.__init__, rawio, sys.maxsize)
1038
1039 def test_initialization(self):
1040 rawio = self.MockRawIO([b"abc"])
1041 bufio = self.tp(rawio)
1042 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1043 self.assertRaises(ValueError, bufio.read)
1044 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1045 self.assertRaises(ValueError, bufio.read)
1046 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1047 self.assertRaises(ValueError, bufio.read)
1048
1049 def test_misbehaved_io_read(self):
1050 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1051 bufio = self.tp(rawio)
1052 # _pyio.BufferedReader seems to implement reading different, so that
1053 # checking this is not so easy.
1054 self.assertRaises(IOError, bufio.read, 10)
1055
1056 def test_garbage_collection(self):
1057 # C BufferedReader objects are collected.
1058 # The Python version has __del__, so it ends into gc.garbage instead
1059 rawio = self.FileIO(support.TESTFN, "w+b")
1060 f = self.tp(rawio)
1061 f.f = f
1062 wr = weakref.ref(f)
1063 del f
1064 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001065 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00001066
R David Murray5b2cf5e2013-02-23 22:11:21 -05001067 def test_args_error(self):
1068 # Issue #17275
1069 with self.assertRaisesRegexp(TypeError, "BufferedReader"):
1070 self.tp(io.BytesIO(), 1024, 1024, 1024)
1071
1072
Antoine Pitrou19690592009-06-12 20:14:08 +00001073class PyBufferedReaderTest(BufferedReaderTest):
1074 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001075
1076
Antoine Pitrou19690592009-06-12 20:14:08 +00001077class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1078 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001079
Antoine Pitrou19690592009-06-12 20:14:08 +00001080 def test_constructor(self):
1081 rawio = self.MockRawIO()
1082 bufio = self.tp(rawio)
1083 bufio.__init__(rawio)
1084 bufio.__init__(rawio, buffer_size=1024)
1085 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +00001086 self.assertEqual(3, bufio.write(b"abc"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001087 bufio.flush()
1088 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1089 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1090 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1091 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +00001092 self.assertEqual(3, bufio.write(b"ghi"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001093 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001094 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001095
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001096 def test_uninitialized(self):
1097 bufio = self.tp.__new__(self.tp)
1098 del bufio
1099 bufio = self.tp.__new__(self.tp)
1100 self.assertRaisesRegexp((ValueError, AttributeError),
1101 'uninitialized|has no attribute',
1102 bufio.write, b'')
1103 bufio.__init__(self.MockRawIO())
1104 self.assertEqual(bufio.write(b''), 0)
1105
Antoine Pitrou19690592009-06-12 20:14:08 +00001106 def test_detach_flush(self):
1107 raw = self.MockRawIO()
1108 buf = self.tp(raw)
1109 buf.write(b"howdy!")
1110 self.assertFalse(raw._write_stack)
1111 buf.detach()
1112 self.assertEqual(raw._write_stack, [b"howdy!"])
1113
1114 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001115 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +00001116 writer = self.MockRawIO()
1117 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001118 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001119 self.assertFalse(writer._write_stack)
1120
Antoine Pitrou19690592009-06-12 20:14:08 +00001121 def test_write_overflow(self):
1122 writer = self.MockRawIO()
1123 bufio = self.tp(writer, 8)
1124 contents = b"abcdefghijklmnop"
1125 for n in range(0, len(contents), 3):
1126 bufio.write(contents[n:n+3])
1127 flushed = b"".join(writer._write_stack)
1128 # At least (total - 8) bytes were implicitly flushed, perhaps more
1129 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001130 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001131
Antoine Pitrou19690592009-06-12 20:14:08 +00001132 def check_writes(self, intermediate_func):
1133 # Lots of writes, test the flushed output is as expected.
1134 contents = bytes(range(256)) * 1000
1135 n = 0
1136 writer = self.MockRawIO()
1137 bufio = self.tp(writer, 13)
1138 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1139 def gen_sizes():
1140 for size in count(1):
1141 for i in range(15):
1142 yield size
1143 sizes = gen_sizes()
1144 while n < len(contents):
1145 size = min(next(sizes), len(contents) - n)
Ezio Melotti2623a372010-11-21 13:34:58 +00001146 self.assertEqual(bufio.write(contents[n:n+size]), size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001147 intermediate_func(bufio)
1148 n += size
1149 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001150 self.assertEqual(contents,
Antoine Pitrou19690592009-06-12 20:14:08 +00001151 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001152
Antoine Pitrou19690592009-06-12 20:14:08 +00001153 def test_writes(self):
1154 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001155
Antoine Pitrou19690592009-06-12 20:14:08 +00001156 def test_writes_and_flushes(self):
1157 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001158
Antoine Pitrou19690592009-06-12 20:14:08 +00001159 def test_writes_and_seeks(self):
1160 def _seekabs(bufio):
1161 pos = bufio.tell()
1162 bufio.seek(pos + 1, 0)
1163 bufio.seek(pos - 1, 0)
1164 bufio.seek(pos, 0)
1165 self.check_writes(_seekabs)
1166 def _seekrel(bufio):
1167 pos = bufio.seek(0, 1)
1168 bufio.seek(+1, 1)
1169 bufio.seek(-1, 1)
1170 bufio.seek(pos, 0)
1171 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001172
Antoine Pitrou19690592009-06-12 20:14:08 +00001173 def test_writes_and_truncates(self):
1174 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001175
Antoine Pitrou19690592009-06-12 20:14:08 +00001176 def test_write_non_blocking(self):
1177 raw = self.MockNonBlockWriterIO()
1178 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001179
Ezio Melotti2623a372010-11-21 13:34:58 +00001180 self.assertEqual(bufio.write(b"abcd"), 4)
1181 self.assertEqual(bufio.write(b"efghi"), 5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001182 # 1 byte will be written, the rest will be buffered
1183 raw.block_on(b"k")
Ezio Melotti2623a372010-11-21 13:34:58 +00001184 self.assertEqual(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001185
Antoine Pitrou19690592009-06-12 20:14:08 +00001186 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1187 raw.block_on(b"0")
1188 try:
1189 bufio.write(b"opqrwxyz0123456789")
1190 except self.BlockingIOError as e:
1191 written = e.characters_written
1192 else:
1193 self.fail("BlockingIOError should have been raised")
Ezio Melotti2623a372010-11-21 13:34:58 +00001194 self.assertEqual(written, 16)
1195 self.assertEqual(raw.pop_written(),
Antoine Pitrou19690592009-06-12 20:14:08 +00001196 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001197
Ezio Melotti2623a372010-11-21 13:34:58 +00001198 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Antoine Pitrou19690592009-06-12 20:14:08 +00001199 s = raw.pop_written()
1200 # Previously buffered bytes were flushed
1201 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001202
Antoine Pitrou19690592009-06-12 20:14:08 +00001203 def test_write_and_rewind(self):
1204 raw = io.BytesIO()
1205 bufio = self.tp(raw, 4)
1206 self.assertEqual(bufio.write(b"abcdef"), 6)
1207 self.assertEqual(bufio.tell(), 6)
1208 bufio.seek(0, 0)
1209 self.assertEqual(bufio.write(b"XY"), 2)
1210 bufio.seek(6, 0)
1211 self.assertEqual(raw.getvalue(), b"XYcdef")
1212 self.assertEqual(bufio.write(b"123456"), 6)
1213 bufio.flush()
1214 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001215
Antoine Pitrou19690592009-06-12 20:14:08 +00001216 def test_flush(self):
1217 writer = self.MockRawIO()
1218 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001219 bufio.write(b"abc")
1220 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001221 self.assertEqual(b"abc", writer._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001222
Antoine Pitrou78e761e2012-10-16 22:57:11 +02001223 def test_writelines(self):
1224 l = [b'ab', b'cd', b'ef']
1225 writer = self.MockRawIO()
1226 bufio = self.tp(writer, 8)
1227 bufio.writelines(l)
1228 bufio.flush()
1229 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1230
1231 def test_writelines_userlist(self):
1232 l = UserList([b'ab', b'cd', b'ef'])
1233 writer = self.MockRawIO()
1234 bufio = self.tp(writer, 8)
1235 bufio.writelines(l)
1236 bufio.flush()
1237 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1238
1239 def test_writelines_error(self):
1240 writer = self.MockRawIO()
1241 bufio = self.tp(writer, 8)
1242 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1243 self.assertRaises(TypeError, bufio.writelines, None)
1244
Antoine Pitrou19690592009-06-12 20:14:08 +00001245 def test_destructor(self):
1246 writer = self.MockRawIO()
1247 bufio = self.tp(writer, 8)
1248 bufio.write(b"abc")
1249 del bufio
1250 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001251 self.assertEqual(b"abc", writer._write_stack[0])
Antoine Pitrou19690592009-06-12 20:14:08 +00001252
1253 def test_truncate(self):
1254 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001255 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001256 bufio = self.tp(raw, 8)
1257 bufio.write(b"abcdef")
1258 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001259 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001260 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001261 self.assertEqual(f.read(), b"abc")
1262
Victor Stinner6a102812010-04-27 23:55:59 +00001263 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001264 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001265 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001266 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001267 # Write out many bytes from many threads and test they were
1268 # all flushed.
1269 N = 1000
1270 contents = bytes(range(256)) * N
1271 sizes = cycle([1, 19])
1272 n = 0
1273 queue = deque()
1274 while n < len(contents):
1275 size = next(sizes)
1276 queue.append(contents[n:n+size])
1277 n += size
1278 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001279 # We use a real file object because it allows us to
1280 # exercise situations where the GIL is released before
1281 # writing the buffer to the raw streams. This is in addition
1282 # to concurrency issues due to switching threads in the middle
1283 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001284 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001285 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001286 errors = []
1287 def f():
1288 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001289 while True:
1290 try:
1291 s = queue.popleft()
1292 except IndexError:
1293 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001294 bufio.write(s)
1295 except Exception as e:
1296 errors.append(e)
1297 raise
1298 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03001299 with support.start_threads(threads):
1300 time.sleep(0.02) # yield
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001301 self.assertFalse(errors,
1302 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001303 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001304 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001305 s = f.read()
1306 for i in range(256):
Ezio Melotti2623a372010-11-21 13:34:58 +00001307 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001308 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001309 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001310
Antoine Pitrou19690592009-06-12 20:14:08 +00001311 def test_misbehaved_io(self):
1312 rawio = self.MisbehavedRawIO()
1313 bufio = self.tp(rawio, 5)
1314 self.assertRaises(IOError, bufio.seek, 0)
1315 self.assertRaises(IOError, bufio.tell)
1316 self.assertRaises(IOError, bufio.write, b"abcdef")
1317
1318 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001319 with support.check_warnings(("max_buffer_size is deprecated",
1320 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001321 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001322
Benjamin Petersona2d6d712012-12-20 12:24:10 -06001323 def test_write_error_on_close(self):
1324 raw = self.MockRawIO()
1325 def bad_write(b):
1326 raise IOError()
1327 raw.write = bad_write
1328 b = self.tp(raw)
1329 b.write(b'spam')
1330 self.assertRaises(IOError, b.close) # exception not swallowed
1331 self.assertTrue(b.closed)
1332
Antoine Pitrou19690592009-06-12 20:14:08 +00001333
Antoine Pitroubff5df02012-07-29 19:02:46 +02001334class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001335 tp = io.BufferedWriter
1336
1337 def test_constructor(self):
1338 BufferedWriterTest.test_constructor(self)
1339 # The allocation can succeed on 32-bit builds, e.g. with more
1340 # than 2GB RAM and a 64-bit kernel.
1341 if sys.maxsize > 0x7FFFFFFF:
1342 rawio = self.MockRawIO()
1343 bufio = self.tp(rawio)
1344 self.assertRaises((OverflowError, MemoryError, ValueError),
1345 bufio.__init__, rawio, sys.maxsize)
1346
1347 def test_initialization(self):
1348 rawio = self.MockRawIO()
1349 bufio = self.tp(rawio)
1350 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1351 self.assertRaises(ValueError, bufio.write, b"def")
1352 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1353 self.assertRaises(ValueError, bufio.write, b"def")
1354 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1355 self.assertRaises(ValueError, bufio.write, b"def")
1356
1357 def test_garbage_collection(self):
1358 # C BufferedWriter objects are collected, and collecting them flushes
1359 # all data to disk.
1360 # The Python version has __del__, so it ends into gc.garbage instead
1361 rawio = self.FileIO(support.TESTFN, "w+b")
1362 f = self.tp(rawio)
1363 f.write(b"123xxx")
1364 f.x = f
1365 wr = weakref.ref(f)
1366 del f
1367 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001368 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001369 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001370 self.assertEqual(f.read(), b"123xxx")
1371
R David Murray5b2cf5e2013-02-23 22:11:21 -05001372 def test_args_error(self):
1373 # Issue #17275
1374 with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
1375 self.tp(io.BytesIO(), 1024, 1024, 1024)
1376
Antoine Pitrou19690592009-06-12 20:14:08 +00001377
1378class PyBufferedWriterTest(BufferedWriterTest):
1379 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001380
1381class BufferedRWPairTest(unittest.TestCase):
1382
Antoine Pitrou19690592009-06-12 20:14:08 +00001383 def test_constructor(self):
1384 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001385 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001386
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001387 def test_uninitialized(self):
1388 pair = self.tp.__new__(self.tp)
1389 del pair
1390 pair = self.tp.__new__(self.tp)
1391 self.assertRaisesRegexp((ValueError, AttributeError),
1392 'uninitialized|has no attribute',
1393 pair.read, 0)
1394 self.assertRaisesRegexp((ValueError, AttributeError),
1395 'uninitialized|has no attribute',
1396 pair.write, b'')
1397 pair.__init__(self.MockRawIO(), self.MockRawIO())
1398 self.assertEqual(pair.read(0), b'')
1399 self.assertEqual(pair.write(b''), 0)
1400
Antoine Pitrou19690592009-06-12 20:14:08 +00001401 def test_detach(self):
1402 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1403 self.assertRaises(self.UnsupportedOperation, pair.detach)
1404
1405 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001406 with support.check_warnings(("max_buffer_size is deprecated",
1407 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001408 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001409
1410 def test_constructor_with_not_readable(self):
1411 class NotReadable(MockRawIO):
1412 def readable(self):
1413 return False
1414
1415 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1416
1417 def test_constructor_with_not_writeable(self):
1418 class NotWriteable(MockRawIO):
1419 def writable(self):
1420 return False
1421
1422 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1423
1424 def test_read(self):
1425 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1426
1427 self.assertEqual(pair.read(3), b"abc")
1428 self.assertEqual(pair.read(1), b"d")
1429 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001430 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1431 self.assertEqual(pair.read(None), b"abc")
1432
1433 def test_readlines(self):
1434 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1435 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1436 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1437 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001438
1439 def test_read1(self):
1440 # .read1() is delegated to the underlying reader object, so this test
1441 # can be shallow.
1442 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1443
1444 self.assertEqual(pair.read1(3), b"abc")
1445
1446 def test_readinto(self):
1447 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1448
1449 data = bytearray(5)
1450 self.assertEqual(pair.readinto(data), 5)
1451 self.assertEqual(data, b"abcde")
1452
1453 def test_write(self):
1454 w = self.MockRawIO()
1455 pair = self.tp(self.MockRawIO(), w)
1456
1457 pair.write(b"abc")
1458 pair.flush()
1459 pair.write(b"def")
1460 pair.flush()
1461 self.assertEqual(w._write_stack, [b"abc", b"def"])
1462
1463 def test_peek(self):
1464 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1465
1466 self.assertTrue(pair.peek(3).startswith(b"abc"))
1467 self.assertEqual(pair.read(3), b"abc")
1468
1469 def test_readable(self):
1470 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1471 self.assertTrue(pair.readable())
1472
1473 def test_writeable(self):
1474 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1475 self.assertTrue(pair.writable())
1476
1477 def test_seekable(self):
1478 # BufferedRWPairs are never seekable, even if their readers and writers
1479 # are.
1480 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1481 self.assertFalse(pair.seekable())
1482
1483 # .flush() is delegated to the underlying writer object and has been
1484 # tested in the test_write method.
1485
1486 def test_close_and_closed(self):
1487 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1488 self.assertFalse(pair.closed)
1489 pair.close()
1490 self.assertTrue(pair.closed)
1491
Serhiy Storchakaf95a57f2015-03-24 23:23:42 +02001492 def test_reader_close_error_on_close(self):
1493 def reader_close():
1494 reader_non_existing
1495 reader = self.MockRawIO()
1496 reader.close = reader_close
1497 writer = self.MockRawIO()
1498 pair = self.tp(reader, writer)
1499 with self.assertRaises(NameError) as err:
1500 pair.close()
1501 self.assertIn('reader_non_existing', str(err.exception))
1502 self.assertTrue(pair.closed)
1503 self.assertFalse(reader.closed)
1504 self.assertTrue(writer.closed)
1505
1506 def test_writer_close_error_on_close(self):
1507 def writer_close():
1508 writer_non_existing
1509 reader = self.MockRawIO()
1510 writer = self.MockRawIO()
1511 writer.close = writer_close
1512 pair = self.tp(reader, writer)
1513 with self.assertRaises(NameError) as err:
1514 pair.close()
1515 self.assertIn('writer_non_existing', str(err.exception))
1516 self.assertFalse(pair.closed)
1517 self.assertTrue(reader.closed)
1518 self.assertFalse(writer.closed)
1519
1520 def test_reader_writer_close_error_on_close(self):
1521 def reader_close():
1522 reader_non_existing
1523 def writer_close():
1524 writer_non_existing
1525 reader = self.MockRawIO()
1526 reader.close = reader_close
1527 writer = self.MockRawIO()
1528 writer.close = writer_close
1529 pair = self.tp(reader, writer)
1530 with self.assertRaises(NameError) as err:
1531 pair.close()
1532 self.assertIn('reader_non_existing', str(err.exception))
1533 self.assertFalse(pair.closed)
1534 self.assertFalse(reader.closed)
1535 self.assertFalse(writer.closed)
1536
Antoine Pitrou19690592009-06-12 20:14:08 +00001537 def test_isatty(self):
1538 class SelectableIsAtty(MockRawIO):
1539 def __init__(self, isatty):
1540 MockRawIO.__init__(self)
1541 self._isatty = isatty
1542
1543 def isatty(self):
1544 return self._isatty
1545
1546 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1547 self.assertFalse(pair.isatty())
1548
1549 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1550 self.assertTrue(pair.isatty())
1551
1552 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1553 self.assertTrue(pair.isatty())
1554
1555 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1556 self.assertTrue(pair.isatty())
1557
Benjamin Peterson1c873bf2014-09-29 22:46:57 -04001558 def test_weakref_clearing(self):
1559 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1560 ref = weakref.ref(brw)
1561 brw = None
1562 ref = None # Shouldn't segfault.
1563
Antoine Pitrou19690592009-06-12 20:14:08 +00001564class CBufferedRWPairTest(BufferedRWPairTest):
1565 tp = io.BufferedRWPair
1566
1567class PyBufferedRWPairTest(BufferedRWPairTest):
1568 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001569
1570
Antoine Pitrou19690592009-06-12 20:14:08 +00001571class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1572 read_mode = "rb+"
1573 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001574
Antoine Pitrou19690592009-06-12 20:14:08 +00001575 def test_constructor(self):
1576 BufferedReaderTest.test_constructor(self)
1577 BufferedWriterTest.test_constructor(self)
1578
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001579 def test_uninitialized(self):
1580 BufferedReaderTest.test_uninitialized(self)
1581 BufferedWriterTest.test_uninitialized(self)
1582
Antoine Pitrou19690592009-06-12 20:14:08 +00001583 def test_read_and_write(self):
1584 raw = self.MockRawIO((b"asdf", b"ghjk"))
1585 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001586
1587 self.assertEqual(b"as", rw.read(2))
1588 rw.write(b"ddd")
1589 rw.write(b"eee")
1590 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001591 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001592 self.assertEqual(b"dddeee", raw._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001593
Antoine Pitrou19690592009-06-12 20:14:08 +00001594 def test_seek_and_tell(self):
1595 raw = self.BytesIO(b"asdfghjkl")
1596 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001597
Ezio Melotti2623a372010-11-21 13:34:58 +00001598 self.assertEqual(b"as", rw.read(2))
1599 self.assertEqual(2, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001600 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001601 self.assertEqual(b"asdf", rw.read(4))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001602
Antoine Pitrou808cec52011-08-20 15:40:58 +02001603 rw.write(b"123f")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001604 rw.seek(0, 0)
Antoine Pitrou808cec52011-08-20 15:40:58 +02001605 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001606 self.assertEqual(9, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001607 rw.seek(-4, 2)
Ezio Melotti2623a372010-11-21 13:34:58 +00001608 self.assertEqual(5, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001609 rw.seek(2, 1)
Ezio Melotti2623a372010-11-21 13:34:58 +00001610 self.assertEqual(7, rw.tell())
1611 self.assertEqual(b"fl", rw.read(11))
Antoine Pitrou808cec52011-08-20 15:40:58 +02001612 rw.flush()
1613 self.assertEqual(b"asdf123fl", raw.getvalue())
1614
Christian Heimes1a6387e2008-03-26 12:49:49 +00001615 self.assertRaises(TypeError, rw.seek, 0.0)
1616
Antoine Pitrou19690592009-06-12 20:14:08 +00001617 def check_flush_and_read(self, read_func):
1618 raw = self.BytesIO(b"abcdefghi")
1619 bufio = self.tp(raw)
1620
Ezio Melotti2623a372010-11-21 13:34:58 +00001621 self.assertEqual(b"ab", read_func(bufio, 2))
Antoine Pitrou19690592009-06-12 20:14:08 +00001622 bufio.write(b"12")
Ezio Melotti2623a372010-11-21 13:34:58 +00001623 self.assertEqual(b"ef", read_func(bufio, 2))
1624 self.assertEqual(6, bufio.tell())
Antoine Pitrou19690592009-06-12 20:14:08 +00001625 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001626 self.assertEqual(6, bufio.tell())
1627 self.assertEqual(b"ghi", read_func(bufio))
Antoine Pitrou19690592009-06-12 20:14:08 +00001628 raw.seek(0, 0)
1629 raw.write(b"XYZ")
1630 # flush() resets the read buffer
1631 bufio.flush()
1632 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001633 self.assertEqual(b"XYZ", read_func(bufio, 3))
Antoine Pitrou19690592009-06-12 20:14:08 +00001634
1635 def test_flush_and_read(self):
1636 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1637
1638 def test_flush_and_readinto(self):
1639 def _readinto(bufio, n=-1):
1640 b = bytearray(n if n >= 0 else 9999)
1641 n = bufio.readinto(b)
1642 return bytes(b[:n])
1643 self.check_flush_and_read(_readinto)
1644
1645 def test_flush_and_peek(self):
1646 def _peek(bufio, n=-1):
1647 # This relies on the fact that the buffer can contain the whole
1648 # raw stream, otherwise peek() can return less.
1649 b = bufio.peek(n)
1650 if n != -1:
1651 b = b[:n]
1652 bufio.seek(len(b), 1)
1653 return b
1654 self.check_flush_and_read(_peek)
1655
1656 def test_flush_and_write(self):
1657 raw = self.BytesIO(b"abcdefghi")
1658 bufio = self.tp(raw)
1659
1660 bufio.write(b"123")
1661 bufio.flush()
1662 bufio.write(b"45")
1663 bufio.flush()
1664 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001665 self.assertEqual(b"12345fghi", raw.getvalue())
1666 self.assertEqual(b"12345fghi", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +00001667
1668 def test_threads(self):
1669 BufferedReaderTest.test_threads(self)
1670 BufferedWriterTest.test_threads(self)
1671
1672 def test_writes_and_peek(self):
1673 def _peek(bufio):
1674 bufio.peek(1)
1675 self.check_writes(_peek)
1676 def _peek(bufio):
1677 pos = bufio.tell()
1678 bufio.seek(-1, 1)
1679 bufio.peek(1)
1680 bufio.seek(pos, 0)
1681 self.check_writes(_peek)
1682
1683 def test_writes_and_reads(self):
1684 def _read(bufio):
1685 bufio.seek(-1, 1)
1686 bufio.read(1)
1687 self.check_writes(_read)
1688
1689 def test_writes_and_read1s(self):
1690 def _read1(bufio):
1691 bufio.seek(-1, 1)
1692 bufio.read1(1)
1693 self.check_writes(_read1)
1694
1695 def test_writes_and_readintos(self):
1696 def _read(bufio):
1697 bufio.seek(-1, 1)
1698 bufio.readinto(bytearray(1))
1699 self.check_writes(_read)
1700
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001701 def test_write_after_readahead(self):
1702 # Issue #6629: writing after the buffer was filled by readahead should
1703 # first rewind the raw stream.
1704 for overwrite_size in [1, 5]:
1705 raw = self.BytesIO(b"A" * 10)
1706 bufio = self.tp(raw, 4)
1707 # Trigger readahead
1708 self.assertEqual(bufio.read(1), b"A")
1709 self.assertEqual(bufio.tell(), 1)
1710 # Overwriting should rewind the raw stream if it needs so
1711 bufio.write(b"B" * overwrite_size)
1712 self.assertEqual(bufio.tell(), overwrite_size + 1)
1713 # If the write size was smaller than the buffer size, flush() and
1714 # check that rewind happens.
1715 bufio.flush()
1716 self.assertEqual(bufio.tell(), overwrite_size + 1)
1717 s = raw.getvalue()
1718 self.assertEqual(s,
1719 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1720
Antoine Pitrouee46a7b2011-05-13 00:31:52 +02001721 def test_write_rewind_write(self):
1722 # Various combinations of reading / writing / seeking backwards / writing again
1723 def mutate(bufio, pos1, pos2):
1724 assert pos2 >= pos1
1725 # Fill the buffer
1726 bufio.seek(pos1)
1727 bufio.read(pos2 - pos1)
1728 bufio.write(b'\x02')
1729 # This writes earlier than the previous write, but still inside
1730 # the buffer.
1731 bufio.seek(pos1)
1732 bufio.write(b'\x01')
1733
1734 b = b"\x80\x81\x82\x83\x84"
1735 for i in range(0, len(b)):
1736 for j in range(i, len(b)):
1737 raw = self.BytesIO(b)
1738 bufio = self.tp(raw, 100)
1739 mutate(bufio, i, j)
1740 bufio.flush()
1741 expected = bytearray(b)
1742 expected[j] = 2
1743 expected[i] = 1
1744 self.assertEqual(raw.getvalue(), expected,
1745 "failed result for i=%d, j=%d" % (i, j))
1746
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001747 def test_truncate_after_read_or_write(self):
1748 raw = self.BytesIO(b"A" * 10)
1749 bufio = self.tp(raw, 100)
1750 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1751 self.assertEqual(bufio.truncate(), 2)
1752 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1753 self.assertEqual(bufio.truncate(), 4)
1754
Antoine Pitrou19690592009-06-12 20:14:08 +00001755 def test_misbehaved_io(self):
1756 BufferedReaderTest.test_misbehaved_io(self)
1757 BufferedWriterTest.test_misbehaved_io(self)
1758
Antoine Pitrou808cec52011-08-20 15:40:58 +02001759 def test_interleaved_read_write(self):
1760 # Test for issue #12213
1761 with self.BytesIO(b'abcdefgh') as raw:
1762 with self.tp(raw, 100) as f:
1763 f.write(b"1")
1764 self.assertEqual(f.read(1), b'b')
1765 f.write(b'2')
1766 self.assertEqual(f.read1(1), b'd')
1767 f.write(b'3')
1768 buf = bytearray(1)
1769 f.readinto(buf)
1770 self.assertEqual(buf, b'f')
1771 f.write(b'4')
1772 self.assertEqual(f.peek(1), b'h')
1773 f.flush()
1774 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1775
1776 with self.BytesIO(b'abc') as raw:
1777 with self.tp(raw, 100) as f:
1778 self.assertEqual(f.read(1), b'a')
1779 f.write(b"2")
1780 self.assertEqual(f.read(1), b'c')
1781 f.flush()
1782 self.assertEqual(raw.getvalue(), b'a2c')
1783
1784 def test_interleaved_readline_write(self):
1785 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1786 with self.tp(raw) as f:
1787 f.write(b'1')
1788 self.assertEqual(f.readline(), b'b\n')
1789 f.write(b'2')
1790 self.assertEqual(f.readline(), b'def\n')
1791 f.write(b'3')
1792 self.assertEqual(f.readline(), b'\n')
1793 f.flush()
1794 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1795
R David Murray5b2cf5e2013-02-23 22:11:21 -05001796
Antoine Pitroubff5df02012-07-29 19:02:46 +02001797class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
1798 BufferedRandomTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001799 tp = io.BufferedRandom
1800
1801 def test_constructor(self):
1802 BufferedRandomTest.test_constructor(self)
1803 # The allocation can succeed on 32-bit builds, e.g. with more
1804 # than 2GB RAM and a 64-bit kernel.
1805 if sys.maxsize > 0x7FFFFFFF:
1806 rawio = self.MockRawIO()
1807 bufio = self.tp(rawio)
1808 self.assertRaises((OverflowError, MemoryError, ValueError),
1809 bufio.__init__, rawio, sys.maxsize)
1810
1811 def test_garbage_collection(self):
1812 CBufferedReaderTest.test_garbage_collection(self)
1813 CBufferedWriterTest.test_garbage_collection(self)
1814
R David Murray5b2cf5e2013-02-23 22:11:21 -05001815 def test_args_error(self):
1816 # Issue #17275
1817 with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
1818 self.tp(io.BytesIO(), 1024, 1024, 1024)
1819
1820
Antoine Pitrou19690592009-06-12 20:14:08 +00001821class PyBufferedRandomTest(BufferedRandomTest):
1822 tp = pyio.BufferedRandom
1823
1824
Christian Heimes1a6387e2008-03-26 12:49:49 +00001825# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1826# properties:
1827# - A single output character can correspond to many bytes of input.
1828# - The number of input bytes to complete the character can be
1829# undetermined until the last input byte is received.
1830# - The number of input bytes can vary depending on previous input.
1831# - A single input byte can correspond to many characters of output.
1832# - The number of output characters can be undetermined until the
1833# last input byte is received.
1834# - The number of output characters can vary depending on previous input.
1835
1836class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1837 """
1838 For testing seek/tell behavior with a stateful, buffering decoder.
1839
1840 Input is a sequence of words. Words may be fixed-length (length set
1841 by input) or variable-length (period-terminated). In variable-length
1842 mode, extra periods are ignored. Possible words are:
1843 - 'i' followed by a number sets the input length, I (maximum 99).
1844 When I is set to 0, words are space-terminated.
1845 - 'o' followed by a number sets the output length, O (maximum 99).
1846 - Any other word is converted into a word followed by a period on
1847 the output. The output word consists of the input word truncated
1848 or padded out with hyphens to make its length equal to O. If O
1849 is 0, the word is output verbatim without truncating or padding.
1850 I and O are initially set to 1. When I changes, any buffered input is
1851 re-scanned according to the new I. EOF also terminates the last word.
1852 """
1853
1854 def __init__(self, errors='strict'):
1855 codecs.IncrementalDecoder.__init__(self, errors)
1856 self.reset()
1857
1858 def __repr__(self):
1859 return '<SID %x>' % id(self)
1860
1861 def reset(self):
1862 self.i = 1
1863 self.o = 1
1864 self.buffer = bytearray()
1865
1866 def getstate(self):
1867 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1868 return bytes(self.buffer), i*100 + o
1869
1870 def setstate(self, state):
1871 buffer, io = state
1872 self.buffer = bytearray(buffer)
1873 i, o = divmod(io, 100)
1874 self.i, self.o = i ^ 1, o ^ 1
1875
1876 def decode(self, input, final=False):
1877 output = ''
1878 for b in input:
1879 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001880 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001881 if self.buffer:
1882 output += self.process_word()
1883 else:
1884 self.buffer.append(b)
1885 else: # fixed-length, terminate after self.i bytes
1886 self.buffer.append(b)
1887 if len(self.buffer) == self.i:
1888 output += self.process_word()
1889 if final and self.buffer: # EOF terminates the last word
1890 output += self.process_word()
1891 return output
1892
1893 def process_word(self):
1894 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001895 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001896 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001897 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001898 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1899 else:
1900 output = self.buffer.decode('ascii')
1901 if len(output) < self.o:
1902 output += '-'*self.o # pad out with hyphens
1903 if self.o:
1904 output = output[:self.o] # truncate to output length
1905 output += '.'
1906 self.buffer = bytearray()
1907 return output
1908
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001909 codecEnabled = False
1910
1911 @classmethod
1912 def lookupTestDecoder(cls, name):
1913 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001914 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001915 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001916 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001917 incrementalencoder=None,
1918 streamreader=None, streamwriter=None,
1919 incrementaldecoder=cls)
1920
1921# Register the previous decoder for testing.
1922# Disabled by default, tests will enable it.
1923codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1924
1925
Christian Heimes1a6387e2008-03-26 12:49:49 +00001926class StatefulIncrementalDecoderTest(unittest.TestCase):
1927 """
1928 Make sure the StatefulIncrementalDecoder actually works.
1929 """
1930
1931 test_cases = [
1932 # I=1, O=1 (fixed-length input == fixed-length output)
1933 (b'abcd', False, 'a.b.c.d.'),
1934 # I=0, O=0 (variable-length input, variable-length output)
1935 (b'oiabcd', True, 'abcd.'),
1936 # I=0, O=0 (should ignore extra periods)
1937 (b'oi...abcd...', True, 'abcd.'),
1938 # I=0, O=6 (variable-length input, fixed-length output)
1939 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1940 # I=2, O=6 (fixed-length input < fixed-length output)
1941 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1942 # I=6, O=3 (fixed-length input > fixed-length output)
1943 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1944 # I=0, then 3; O=29, then 15 (with longer output)
1945 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1946 'a----------------------------.' +
1947 'b----------------------------.' +
1948 'cde--------------------------.' +
1949 'abcdefghijabcde.' +
1950 'a.b------------.' +
1951 '.c.------------.' +
1952 'd.e------------.' +
1953 'k--------------.' +
1954 'l--------------.' +
1955 'm--------------.')
1956 ]
1957
Antoine Pitrou19690592009-06-12 20:14:08 +00001958 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001959 # Try a few one-shot test cases.
1960 for input, eof, output in self.test_cases:
1961 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001962 self.assertEqual(d.decode(input, eof), output)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001963
1964 # Also test an unfinished decode, followed by forcing EOF.
1965 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001966 self.assertEqual(d.decode(b'oiabcd'), '')
1967 self.assertEqual(d.decode(b'', 1), 'abcd.')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001968
1969class TextIOWrapperTest(unittest.TestCase):
1970
1971 def setUp(self):
1972 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1973 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001974 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001975
1976 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001977 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001978
Antoine Pitrou19690592009-06-12 20:14:08 +00001979 def test_constructor(self):
1980 r = self.BytesIO(b"\xc3\xa9\n\n")
1981 b = self.BufferedReader(r, 1000)
1982 t = self.TextIOWrapper(b)
1983 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti2623a372010-11-21 13:34:58 +00001984 self.assertEqual(t.encoding, "latin1")
1985 self.assertEqual(t.line_buffering, False)
Antoine Pitrou19690592009-06-12 20:14:08 +00001986 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti2623a372010-11-21 13:34:58 +00001987 self.assertEqual(t.encoding, "utf8")
1988 self.assertEqual(t.line_buffering, True)
1989 self.assertEqual("\xe9\n", t.readline())
Antoine Pitrou19690592009-06-12 20:14:08 +00001990 self.assertRaises(TypeError, t.__init__, b, newline=42)
1991 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1992
1993 def test_detach(self):
1994 r = self.BytesIO()
1995 b = self.BufferedWriter(r)
1996 t = self.TextIOWrapper(b)
1997 self.assertIs(t.detach(), b)
1998
1999 t = self.TextIOWrapper(b, encoding="ascii")
2000 t.write("howdy")
2001 self.assertFalse(r.getvalue())
2002 t.detach()
2003 self.assertEqual(r.getvalue(), b"howdy")
2004 self.assertRaises(ValueError, t.detach)
2005
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002006 # Operations independent of the detached stream should still work
2007 repr(t)
2008 self.assertEqual(t.encoding, "ascii")
2009 self.assertEqual(t.errors, "strict")
2010 self.assertFalse(t.line_buffering)
2011
Antoine Pitrou19690592009-06-12 20:14:08 +00002012 def test_repr(self):
2013 raw = self.BytesIO("hello".encode("utf-8"))
2014 b = self.BufferedReader(raw)
2015 t = self.TextIOWrapper(b, encoding="utf-8")
2016 modname = self.TextIOWrapper.__module__
2017 self.assertEqual(repr(t),
2018 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2019 raw.name = "dummy"
2020 self.assertEqual(repr(t),
2021 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
2022 raw.name = b"dummy"
2023 self.assertEqual(repr(t),
2024 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
2025
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002026 t.buffer.detach()
2027 repr(t) # Should not raise an exception
2028
Antoine Pitrou19690592009-06-12 20:14:08 +00002029 def test_line_buffering(self):
2030 r = self.BytesIO()
2031 b = self.BufferedWriter(r, 1000)
2032 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
2033 t.write("X")
Ezio Melotti2623a372010-11-21 13:34:58 +00002034 self.assertEqual(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00002035 t.write("Y\nZ")
Ezio Melotti2623a372010-11-21 13:34:58 +00002036 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00002037 t.write("A\rB")
Ezio Melotti2623a372010-11-21 13:34:58 +00002038 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002039
Antoine Pitrou19690592009-06-12 20:14:08 +00002040 def test_encoding(self):
2041 # Check the encoding attribute is always set, and valid
2042 b = self.BytesIO()
2043 t = self.TextIOWrapper(b, encoding="utf8")
2044 self.assertEqual(t.encoding, "utf8")
2045 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002046 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002047 codecs.lookup(t.encoding)
2048
2049 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002050 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00002051 b = self.BytesIO(b"abc\n\xff\n")
2052 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002053 self.assertRaises(UnicodeError, t.read)
2054 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00002055 b = self.BytesIO(b"abc\n\xff\n")
2056 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002057 self.assertRaises(UnicodeError, t.read)
2058 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00002059 b = self.BytesIO(b"abc\n\xff\n")
2060 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti2623a372010-11-21 13:34:58 +00002061 self.assertEqual(t.read(), "abc\n\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002062 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00002063 b = self.BytesIO(b"abc\n\xff\n")
2064 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti2623a372010-11-21 13:34:58 +00002065 self.assertEqual(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002066
Antoine Pitrou19690592009-06-12 20:14:08 +00002067 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002068 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00002069 b = self.BytesIO()
2070 t = self.TextIOWrapper(b, encoding="ascii")
2071 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002072 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00002073 b = self.BytesIO()
2074 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2075 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002076 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00002077 b = self.BytesIO()
2078 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00002079 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00002080 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002081 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002082 self.assertEqual(b.getvalue(), b"abcdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002083 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00002084 b = self.BytesIO()
2085 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00002086 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00002087 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002088 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002089 self.assertEqual(b.getvalue(), b"abc?def\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002090
Antoine Pitrou19690592009-06-12 20:14:08 +00002091 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002092 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2093
2094 tests = [
2095 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
2096 [ '', input_lines ],
2097 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2098 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2099 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
2100 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002101 encodings = (
2102 'utf-8', 'latin-1',
2103 'utf-16', 'utf-16-le', 'utf-16-be',
2104 'utf-32', 'utf-32-le', 'utf-32-be',
2105 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00002106
2107 # Try a range of buffer sizes to test the case where \r is the last
2108 # character in TextIOWrapper._pending_line.
2109 for encoding in encodings:
2110 # XXX: str.encode() should return bytes
2111 data = bytes(''.join(input_lines).encode(encoding))
2112 for do_reads in (False, True):
2113 for bufsize in range(1, 10):
2114 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002115 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2116 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00002117 encoding=encoding)
2118 if do_reads:
2119 got_lines = []
2120 while True:
2121 c2 = textio.read(2)
2122 if c2 == '':
2123 break
Ezio Melotti2623a372010-11-21 13:34:58 +00002124 self.assertEqual(len(c2), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002125 got_lines.append(c2 + textio.readline())
2126 else:
2127 got_lines = list(textio)
2128
2129 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti2623a372010-11-21 13:34:58 +00002130 self.assertEqual(got_line, exp_line)
2131 self.assertEqual(len(got_lines), len(exp_lines))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002132
Antoine Pitrou19690592009-06-12 20:14:08 +00002133 def test_newlines_input(self):
2134 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002135 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2136 for newline, expected in [
2137 (None, normalized.decode("ascii").splitlines(True)),
2138 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00002139 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2140 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2141 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00002142 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00002143 buf = self.BytesIO(testdata)
2144 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti2623a372010-11-21 13:34:58 +00002145 self.assertEqual(txt.readlines(), expected)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002146 txt.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002147 self.assertEqual(txt.read(), "".join(expected))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002148
Antoine Pitrou19690592009-06-12 20:14:08 +00002149 def test_newlines_output(self):
2150 testdict = {
2151 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2152 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2153 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2154 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2155 }
2156 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2157 for newline, expected in tests:
2158 buf = self.BytesIO()
2159 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2160 txt.write("AAA\nB")
2161 txt.write("BB\nCCC\n")
2162 txt.write("X\rY\r\nZ")
2163 txt.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002164 self.assertEqual(buf.closed, False)
2165 self.assertEqual(buf.getvalue(), expected)
Antoine Pitrou19690592009-06-12 20:14:08 +00002166
2167 def test_destructor(self):
2168 l = []
2169 base = self.BytesIO
2170 class MyBytesIO(base):
2171 def close(self):
2172 l.append(self.getvalue())
2173 base.close(self)
2174 b = MyBytesIO()
2175 t = self.TextIOWrapper(b, encoding="ascii")
2176 t.write("abc")
2177 del t
2178 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00002179 self.assertEqual([b"abc"], l)
Antoine Pitrou19690592009-06-12 20:14:08 +00002180
2181 def test_override_destructor(self):
2182 record = []
2183 class MyTextIO(self.TextIOWrapper):
2184 def __del__(self):
2185 record.append(1)
2186 try:
2187 f = super(MyTextIO, self).__del__
2188 except AttributeError:
2189 pass
2190 else:
2191 f()
2192 def close(self):
2193 record.append(2)
2194 super(MyTextIO, self).close()
2195 def flush(self):
2196 record.append(3)
2197 super(MyTextIO, self).flush()
2198 b = self.BytesIO()
2199 t = MyTextIO(b, encoding="ascii")
2200 del t
2201 support.gc_collect()
2202 self.assertEqual(record, [1, 2, 3])
2203
2204 def test_error_through_destructor(self):
2205 # Test that the exception state is not modified by a destructor,
2206 # even if close() fails.
2207 rawio = self.CloseFailureIO()
2208 def f():
2209 self.TextIOWrapper(rawio).xyzzy
2210 with support.captured_output("stderr") as s:
2211 self.assertRaises(AttributeError, f)
2212 s = s.getvalue().strip()
2213 if s:
2214 # The destructor *may* have printed an unraisable error, check it
2215 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002216 self.assertTrue(s.startswith("Exception IOError: "), s)
2217 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002218
2219 # Systematic tests of the text I/O API
2220
Antoine Pitrou19690592009-06-12 20:14:08 +00002221 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002222 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2223 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00002224 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002225 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002226 self.assertEqual(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002227 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002228 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002229 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002230 self.assertEqual(f.tell(), 0)
2231 self.assertEqual(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002232 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002233 self.assertEqual(f.seek(0), 0)
2234 self.assertEqual(f.read(None), "abc")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002235 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002236 self.assertEqual(f.read(2), "ab")
2237 self.assertEqual(f.read(1), "c")
2238 self.assertEqual(f.read(1), "")
2239 self.assertEqual(f.read(), "")
2240 self.assertEqual(f.tell(), cookie)
2241 self.assertEqual(f.seek(0), 0)
2242 self.assertEqual(f.seek(0, 2), cookie)
2243 self.assertEqual(f.write("def"), 3)
2244 self.assertEqual(f.seek(cookie), cookie)
2245 self.assertEqual(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002246 if enc.startswith("utf"):
2247 self.multi_line_test(f, enc)
2248 f.close()
2249
2250 def multi_line_test(self, f, enc):
2251 f.seek(0)
2252 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00002253 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002254 wlines = []
2255 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2256 chars = []
2257 for i in range(size):
2258 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00002259 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002260 wlines.append((f.tell(), line))
2261 f.write(line)
2262 f.seek(0)
2263 rlines = []
2264 while True:
2265 pos = f.tell()
2266 line = f.readline()
2267 if not line:
2268 break
2269 rlines.append((pos, line))
Ezio Melotti2623a372010-11-21 13:34:58 +00002270 self.assertEqual(rlines, wlines)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002271
Antoine Pitrou19690592009-06-12 20:14:08 +00002272 def test_telling(self):
2273 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002274 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002275 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002276 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002277 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002278 p2 = f.tell()
2279 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002280 self.assertEqual(f.tell(), p0)
2281 self.assertEqual(f.readline(), "\xff\n")
2282 self.assertEqual(f.tell(), p1)
2283 self.assertEqual(f.readline(), "\xff\n")
2284 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002285 f.seek(0)
2286 for line in f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002287 self.assertEqual(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002288 self.assertRaises(IOError, f.tell)
Ezio Melotti2623a372010-11-21 13:34:58 +00002289 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002290 f.close()
2291
Antoine Pitrou19690592009-06-12 20:14:08 +00002292 def test_seeking(self):
2293 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00002294 prefix_size = chunk_size - 2
2295 u_prefix = "a" * prefix_size
2296 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti2623a372010-11-21 13:34:58 +00002297 self.assertEqual(len(u_prefix), len(prefix))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002298 u_suffix = "\u8888\n"
2299 suffix = bytes(u_suffix.encode("utf-8"))
2300 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00002301 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002302 f.write(line*2)
2303 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002304 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002305 s = f.read(prefix_size)
Ezio Melotti2623a372010-11-21 13:34:58 +00002306 self.assertEqual(s, prefix.decode("ascii"))
2307 self.assertEqual(f.tell(), prefix_size)
2308 self.assertEqual(f.readline(), u_suffix)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002309
Antoine Pitrou19690592009-06-12 20:14:08 +00002310 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002311 # Regression test for a specific bug
2312 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00002313 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002314 f.write(data)
2315 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002316 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002317 f._CHUNK_SIZE # Just test that it exists
2318 f._CHUNK_SIZE = 2
2319 f.readline()
2320 f.tell()
2321
Antoine Pitrou19690592009-06-12 20:14:08 +00002322 def test_seek_and_tell(self):
2323 #Test seek/tell using the StatefulIncrementalDecoder.
2324 # Make test faster by doing smaller seeks
2325 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00002326
Antoine Pitrou19690592009-06-12 20:14:08 +00002327 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002328 """Tell/seek to various points within a data stream and ensure
2329 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00002330 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002331 f.write(data)
2332 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002333 f = self.open(support.TESTFN, encoding='test_decoder')
2334 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00002335 decoded = f.read()
2336 f.close()
2337
2338 for i in range(min_pos, len(decoded) + 1): # seek positions
2339 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00002340 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti2623a372010-11-21 13:34:58 +00002341 self.assertEqual(f.read(i), decoded[:i])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002342 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002343 self.assertEqual(f.read(j), decoded[i:i + j])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002344 f.seek(cookie)
Ezio Melotti2623a372010-11-21 13:34:58 +00002345 self.assertEqual(f.read(), decoded[i:])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002346 f.close()
2347
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002348 # Enable the test decoder.
2349 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002350
2351 # Run the tests.
2352 try:
2353 # Try each test case.
2354 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002355 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002356
2357 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002358 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2359 offset = CHUNK_SIZE - len(input)//2
2360 prefix = b'.'*offset
2361 # Don't bother seeking into the prefix (takes too long).
2362 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002363 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002364
2365 # Ensure our test decoder won't interfere with subsequent tests.
2366 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002367 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002368
Antoine Pitrou19690592009-06-12 20:14:08 +00002369 def test_encoded_writes(self):
2370 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002371 tests = ("utf-16",
2372 "utf-16-le",
2373 "utf-16-be",
2374 "utf-32",
2375 "utf-32-le",
2376 "utf-32-be")
2377 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002378 buf = self.BytesIO()
2379 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002380 # Check if the BOM is written only once (see issue1753).
2381 f.write(data)
2382 f.write(data)
2383 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002384 self.assertEqual(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002385 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002386 self.assertEqual(f.read(), data * 2)
2387 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002388
Antoine Pitrou19690592009-06-12 20:14:08 +00002389 def test_unreadable(self):
2390 class UnReadable(self.BytesIO):
2391 def readable(self):
2392 return False
2393 txt = self.TextIOWrapper(UnReadable())
2394 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002395
Antoine Pitrou19690592009-06-12 20:14:08 +00002396 def test_read_one_by_one(self):
2397 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002398 reads = ""
2399 while True:
2400 c = txt.read(1)
2401 if not c:
2402 break
2403 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002404 self.assertEqual(reads, "AA\nBB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002405
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002406 def test_readlines(self):
2407 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2408 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2409 txt.seek(0)
2410 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2411 txt.seek(0)
2412 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2413
Christian Heimes1a6387e2008-03-26 12:49:49 +00002414 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002415 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002416 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002417 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002418 reads = ""
2419 while True:
2420 c = txt.read(128)
2421 if not c:
2422 break
2423 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002424 self.assertEqual(reads, "A"*127+"\nB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002425
Antoine Pitroueadca1d2012-10-16 23:02:27 +02002426 def test_writelines(self):
2427 l = ['ab', 'cd', 'ef']
2428 buf = self.BytesIO()
2429 txt = self.TextIOWrapper(buf)
2430 txt.writelines(l)
2431 txt.flush()
2432 self.assertEqual(buf.getvalue(), b'abcdef')
2433
2434 def test_writelines_userlist(self):
2435 l = UserList(['ab', 'cd', 'ef'])
2436 buf = self.BytesIO()
2437 txt = self.TextIOWrapper(buf)
2438 txt.writelines(l)
2439 txt.flush()
2440 self.assertEqual(buf.getvalue(), b'abcdef')
2441
2442 def test_writelines_error(self):
2443 txt = self.TextIOWrapper(self.BytesIO())
2444 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2445 self.assertRaises(TypeError, txt.writelines, None)
2446 self.assertRaises(TypeError, txt.writelines, b'abc')
2447
Christian Heimes1a6387e2008-03-26 12:49:49 +00002448 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002449 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002450
2451 # read one char at a time
2452 reads = ""
2453 while True:
2454 c = txt.read(1)
2455 if not c:
2456 break
2457 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002458 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002459
2460 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002461 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002462 txt._CHUNK_SIZE = 4
2463
2464 reads = ""
2465 while True:
2466 c = txt.read(4)
2467 if not c:
2468 break
2469 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002470 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002471
2472 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002473 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002474 txt._CHUNK_SIZE = 4
2475
2476 reads = txt.read(4)
2477 reads += txt.read(4)
2478 reads += txt.readline()
2479 reads += txt.readline()
2480 reads += txt.readline()
Ezio Melotti2623a372010-11-21 13:34:58 +00002481 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002482
2483 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002484 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002485 txt._CHUNK_SIZE = 4
2486
2487 reads = txt.read(4)
2488 reads += txt.read()
Ezio Melotti2623a372010-11-21 13:34:58 +00002489 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002490
2491 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002492 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002493 txt._CHUNK_SIZE = 4
2494
2495 reads = txt.read(4)
2496 pos = txt.tell()
2497 txt.seek(0)
2498 txt.seek(pos)
Ezio Melotti2623a372010-11-21 13:34:58 +00002499 self.assertEqual(txt.read(4), "BBB\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002500
2501 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002502 buffer = self.BytesIO(self.testdata)
2503 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002504
2505 self.assertEqual(buffer.seekable(), txt.seekable())
2506
Antoine Pitrou19690592009-06-12 20:14:08 +00002507 def test_append_bom(self):
2508 # The BOM is not written again when appending to a non-empty file
2509 filename = support.TESTFN
2510 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2511 with self.open(filename, 'w', encoding=charset) as f:
2512 f.write('aaa')
2513 pos = f.tell()
2514 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002515 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002516
2517 with self.open(filename, 'a', encoding=charset) as f:
2518 f.write('xxx')
2519 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002520 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002521
Antoine Pitrou19690592009-06-12 20:14:08 +00002522 def test_seek_bom(self):
2523 # Same test, but when seeking manually
2524 filename = support.TESTFN
2525 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2526 with self.open(filename, 'w', encoding=charset) as f:
2527 f.write('aaa')
2528 pos = f.tell()
2529 with self.open(filename, 'r+', encoding=charset) as f:
2530 f.seek(pos)
2531 f.write('zzz')
2532 f.seek(0)
2533 f.write('bbb')
2534 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002535 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002536
2537 def test_errors_property(self):
2538 with self.open(support.TESTFN, "w") as f:
2539 self.assertEqual(f.errors, "strict")
2540 with self.open(support.TESTFN, "w", errors="replace") as f:
2541 self.assertEqual(f.errors, "replace")
2542
Victor Stinner6a102812010-04-27 23:55:59 +00002543 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002544 def test_threads_write(self):
2545 # Issue6750: concurrent writes could duplicate data
2546 event = threading.Event()
2547 with self.open(support.TESTFN, "w", buffering=1) as f:
2548 def run(n):
2549 text = "Thread%03d\n" % n
2550 event.wait()
2551 f.write(text)
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03002552 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002553 for x in range(20)]
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03002554 with support.start_threads(threads, event.set):
2555 time.sleep(0.02)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002556 with self.open(support.TESTFN) as f:
2557 content = f.read()
2558 for n in range(20):
Ezio Melotti2623a372010-11-21 13:34:58 +00002559 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002560
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002561 def test_flush_error_on_close(self):
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002562 # Test that text file is closed despite failed flush
2563 # and that flush() is called before file closed.
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002564 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002565 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002566 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002567 closed[:] = [txt.closed, txt.buffer.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002568 raise IOError()
2569 txt.flush = bad_flush
2570 self.assertRaises(IOError, txt.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -06002571 self.assertTrue(txt.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002572 self.assertTrue(txt.buffer.closed)
2573 self.assertTrue(closed) # flush() called
2574 self.assertFalse(closed[0]) # flush() called before file closed
2575 self.assertFalse(closed[1])
Serhiy Storchaka437d5352015-02-23 00:28:38 +02002576 txt.flush = lambda: None # break reference loop
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002577
2578 def test_multi_close(self):
2579 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2580 txt.close()
2581 txt.close()
2582 txt.close()
2583 self.assertRaises(ValueError, txt.flush)
2584
Antoine Pitroufc9ead62010-12-21 21:26:55 +00002585 def test_readonly_attributes(self):
2586 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2587 buf = self.BytesIO(self.testdata)
2588 with self.assertRaises((AttributeError, TypeError)):
2589 txt.buffer = buf
2590
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002591 def test_read_nonbytes(self):
2592 # Issue #17106
2593 # Crash when underlying read() returns non-bytes
2594 class NonbytesStream(self.StringIO):
2595 read1 = self.StringIO.read
2596 class NonbytesStream(self.StringIO):
2597 read1 = self.StringIO.read
2598 t = self.TextIOWrapper(NonbytesStream('a'))
2599 with self.maybeRaises(TypeError):
2600 t.read(1)
2601 t = self.TextIOWrapper(NonbytesStream('a'))
2602 with self.maybeRaises(TypeError):
2603 t.readline()
2604 t = self.TextIOWrapper(NonbytesStream('a'))
2605 self.assertEqual(t.read(), u'a')
2606
2607 def test_illegal_decoder(self):
2608 # Issue #17106
2609 # Crash when decoder returns non-string
2610 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2611 encoding='quopri_codec')
2612 with self.maybeRaises(TypeError):
2613 t.read(1)
2614 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2615 encoding='quopri_codec')
2616 with self.maybeRaises(TypeError):
2617 t.readline()
2618 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2619 encoding='quopri_codec')
2620 with self.maybeRaises(TypeError):
2621 t.read()
2622
2623
Antoine Pitrou19690592009-06-12 20:14:08 +00002624class CTextIOWrapperTest(TextIOWrapperTest):
2625
2626 def test_initialization(self):
2627 r = self.BytesIO(b"\xc3\xa9\n\n")
2628 b = self.BufferedReader(r, 1000)
2629 t = self.TextIOWrapper(b)
2630 self.assertRaises(TypeError, t.__init__, b, newline=42)
2631 self.assertRaises(ValueError, t.read)
2632 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2633 self.assertRaises(ValueError, t.read)
2634
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002635 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2636 self.assertRaises(Exception, repr, t)
2637
Antoine Pitrou19690592009-06-12 20:14:08 +00002638 def test_garbage_collection(self):
2639 # C TextIOWrapper objects are collected, and collecting them flushes
2640 # all data to disk.
2641 # The Python version has __del__, so it ends in gc.garbage instead.
2642 rawio = io.FileIO(support.TESTFN, "wb")
2643 b = self.BufferedWriter(rawio)
2644 t = self.TextIOWrapper(b, encoding="ascii")
2645 t.write("456def")
2646 t.x = t
2647 wr = weakref.ref(t)
2648 del t
2649 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002650 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002651 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002652 self.assertEqual(f.read(), b"456def")
2653
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002654 def test_rwpair_cleared_before_textio(self):
2655 # Issue 13070: TextIOWrapper's finalization would crash when called
2656 # after the reference to the underlying BufferedRWPair's writer got
2657 # cleared by the GC.
2658 for i in range(1000):
2659 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2660 t1 = self.TextIOWrapper(b1, encoding="ascii")
2661 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2662 t2 = self.TextIOWrapper(b2, encoding="ascii")
2663 # circular references
2664 t1.buddy = t2
2665 t2.buddy = t1
2666 support.gc_collect()
2667
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002668 maybeRaises = unittest.TestCase.assertRaises
2669
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002670
Antoine Pitrou19690592009-06-12 20:14:08 +00002671class PyTextIOWrapperTest(TextIOWrapperTest):
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002672 @contextlib.contextmanager
2673 def maybeRaises(self, *args, **kwds):
2674 yield
Antoine Pitrou19690592009-06-12 20:14:08 +00002675
2676
2677class IncrementalNewlineDecoderTest(unittest.TestCase):
2678
2679 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002680 # UTF-8 specific tests for a newline decoder
2681 def _check_decode(b, s, **kwargs):
2682 # We exercise getstate() / setstate() as well as decode()
2683 state = decoder.getstate()
Ezio Melotti2623a372010-11-21 13:34:58 +00002684 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002685 decoder.setstate(state)
Ezio Melotti2623a372010-11-21 13:34:58 +00002686 self.assertEqual(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002687
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002688 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002689
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002690 _check_decode(b'\xe8', "")
2691 _check_decode(b'\xa2', "")
2692 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002693
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002694 _check_decode(b'\xe8', "")
2695 _check_decode(b'\xa2', "")
2696 _check_decode(b'\x88', "\u8888")
2697
2698 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002699 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2700
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002701 decoder.reset()
2702 _check_decode(b'\n', "\n")
2703 _check_decode(b'\r', "")
2704 _check_decode(b'', "\n", final=True)
2705 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002706
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002707 _check_decode(b'\r', "")
2708 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002709
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002710 _check_decode(b'\r\r\n', "\n\n")
2711 _check_decode(b'\r', "")
2712 _check_decode(b'\r', "\n")
2713 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002714
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002715 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2716 _check_decode(b'\xe8\xa2\x88', "\u8888")
2717 _check_decode(b'\n', "\n")
2718 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2719 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002720
Antoine Pitrou19690592009-06-12 20:14:08 +00002721 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002722 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002723 if encoding is not None:
2724 encoder = codecs.getincrementalencoder(encoding)()
2725 def _decode_bytewise(s):
2726 # Decode one byte at a time
2727 for b in encoder.encode(s):
2728 result.append(decoder.decode(b))
2729 else:
2730 encoder = None
2731 def _decode_bytewise(s):
2732 # Decode one char at a time
2733 for c in s:
2734 result.append(decoder.decode(c))
Ezio Melotti2623a372010-11-21 13:34:58 +00002735 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002736 _decode_bytewise("abc\n\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002737 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002738 _decode_bytewise("\nabc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002739 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002740 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002741 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002742 _decode_bytewise("abc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002743 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002744 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002745 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002746 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002747 input = "abc"
2748 if encoder is not None:
2749 encoder.reset()
2750 input = encoder.encode(input)
Ezio Melotti2623a372010-11-21 13:34:58 +00002751 self.assertEqual(decoder.decode(input), "abc")
2752 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002753
2754 def test_newline_decoder(self):
2755 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002756 # None meaning the IncrementalNewlineDecoder takes unicode input
2757 # rather than bytes input
2758 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002759 'utf-16', 'utf-16-le', 'utf-16-be',
2760 'utf-32', 'utf-32-le', 'utf-32-be',
2761 )
2762 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002763 decoder = enc and codecs.getincrementaldecoder(enc)()
2764 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2765 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002766 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002767 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2768 self.check_newline_decoding_utf8(decoder)
2769
2770 def test_newline_bytes(self):
2771 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2772 def _check(dec):
Ezio Melotti2623a372010-11-21 13:34:58 +00002773 self.assertEqual(dec.newlines, None)
2774 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2775 self.assertEqual(dec.newlines, None)
2776 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2777 self.assertEqual(dec.newlines, None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002778 dec = self.IncrementalNewlineDecoder(None, translate=False)
2779 _check(dec)
2780 dec = self.IncrementalNewlineDecoder(None, translate=True)
2781 _check(dec)
2782
2783class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2784 pass
2785
2786class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2787 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002788
Christian Heimes1a6387e2008-03-26 12:49:49 +00002789
2790# XXX Tests for open()
2791
2792class MiscIOTest(unittest.TestCase):
2793
Benjamin Petersonad100c32008-11-20 22:06:22 +00002794 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002795 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002796
Antoine Pitrou19690592009-06-12 20:14:08 +00002797 def test___all__(self):
2798 for name in self.io.__all__:
2799 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002800 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002801 if name == "open":
2802 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002803 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002804 self.assertTrue(issubclass(obj, Exception), name)
2805 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002806 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002807
Benjamin Petersonad100c32008-11-20 22:06:22 +00002808 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002809 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002810 self.assertEqual(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002811 f.close()
2812
Antoine Pitrou19690592009-06-12 20:14:08 +00002813 f = self.open(support.TESTFN, "U")
Ezio Melotti2623a372010-11-21 13:34:58 +00002814 self.assertEqual(f.name, support.TESTFN)
2815 self.assertEqual(f.buffer.name, support.TESTFN)
2816 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2817 self.assertEqual(f.mode, "U")
2818 self.assertEqual(f.buffer.mode, "rb")
2819 self.assertEqual(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002820 f.close()
2821
Antoine Pitrou19690592009-06-12 20:14:08 +00002822 f = self.open(support.TESTFN, "w+")
Ezio Melotti2623a372010-11-21 13:34:58 +00002823 self.assertEqual(f.mode, "w+")
2824 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2825 self.assertEqual(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002826
Antoine Pitrou19690592009-06-12 20:14:08 +00002827 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti2623a372010-11-21 13:34:58 +00002828 self.assertEqual(g.mode, "wb")
2829 self.assertEqual(g.raw.mode, "wb")
2830 self.assertEqual(g.name, f.fileno())
2831 self.assertEqual(g.raw.name, f.fileno())
Benjamin Petersonad100c32008-11-20 22:06:22 +00002832 f.close()
2833 g.close()
2834
Antoine Pitrou19690592009-06-12 20:14:08 +00002835 def test_io_after_close(self):
2836 for kwargs in [
2837 {"mode": "w"},
2838 {"mode": "wb"},
2839 {"mode": "w", "buffering": 1},
2840 {"mode": "w", "buffering": 2},
2841 {"mode": "wb", "buffering": 0},
2842 {"mode": "r"},
2843 {"mode": "rb"},
2844 {"mode": "r", "buffering": 1},
2845 {"mode": "r", "buffering": 2},
2846 {"mode": "rb", "buffering": 0},
2847 {"mode": "w+"},
2848 {"mode": "w+b"},
2849 {"mode": "w+", "buffering": 1},
2850 {"mode": "w+", "buffering": 2},
2851 {"mode": "w+b", "buffering": 0},
2852 ]:
2853 f = self.open(support.TESTFN, **kwargs)
2854 f.close()
2855 self.assertRaises(ValueError, f.flush)
2856 self.assertRaises(ValueError, f.fileno)
2857 self.assertRaises(ValueError, f.isatty)
2858 self.assertRaises(ValueError, f.__iter__)
2859 if hasattr(f, "peek"):
2860 self.assertRaises(ValueError, f.peek, 1)
2861 self.assertRaises(ValueError, f.read)
2862 if hasattr(f, "read1"):
2863 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinner5100a402011-05-25 22:15:36 +02002864 if hasattr(f, "readall"):
2865 self.assertRaises(ValueError, f.readall)
Antoine Pitrou19690592009-06-12 20:14:08 +00002866 if hasattr(f, "readinto"):
2867 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2868 self.assertRaises(ValueError, f.readline)
2869 self.assertRaises(ValueError, f.readlines)
2870 self.assertRaises(ValueError, f.seek, 0)
2871 self.assertRaises(ValueError, f.tell)
2872 self.assertRaises(ValueError, f.truncate)
2873 self.assertRaises(ValueError, f.write,
2874 b"" if "b" in kwargs['mode'] else "")
2875 self.assertRaises(ValueError, f.writelines, [])
2876 self.assertRaises(ValueError, next, f)
2877
2878 def test_blockingioerror(self):
2879 # Various BlockingIOError issues
2880 self.assertRaises(TypeError, self.BlockingIOError)
2881 self.assertRaises(TypeError, self.BlockingIOError, 1)
2882 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2883 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2884 b = self.BlockingIOError(1, "")
2885 self.assertEqual(b.characters_written, 0)
2886 class C(unicode):
2887 pass
2888 c = C("")
2889 b = self.BlockingIOError(1, c)
2890 c.b = b
2891 b.c = c
2892 wr = weakref.ref(c)
2893 del c, b
2894 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002895 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002896
2897 def test_abcs(self):
2898 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002899 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2900 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2901 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2902 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002903
2904 def _check_abc_inheritance(self, abcmodule):
2905 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002906 self.assertIsInstance(f, abcmodule.IOBase)
2907 self.assertIsInstance(f, abcmodule.RawIOBase)
2908 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2909 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002910 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002911 self.assertIsInstance(f, abcmodule.IOBase)
2912 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2913 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2914 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002915 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002916 self.assertIsInstance(f, abcmodule.IOBase)
2917 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2918 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2919 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002920
2921 def test_abc_inheritance(self):
2922 # Test implementations inherit from their respective ABCs
2923 self._check_abc_inheritance(self)
2924
2925 def test_abc_inheritance_official(self):
2926 # Test implementations inherit from the official ABCs of the
2927 # baseline "io" module.
2928 self._check_abc_inheritance(io)
2929
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01002930 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2931 def test_nonblock_pipe_write_bigbuf(self):
2932 self._test_nonblock_pipe_write(16*1024)
2933
2934 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2935 def test_nonblock_pipe_write_smallbuf(self):
2936 self._test_nonblock_pipe_write(1024)
2937
2938 def _set_non_blocking(self, fd):
2939 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2940 self.assertNotEqual(flags, -1)
2941 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2942 self.assertEqual(res, 0)
2943
2944 def _test_nonblock_pipe_write(self, bufsize):
2945 sent = []
2946 received = []
2947 r, w = os.pipe()
2948 self._set_non_blocking(r)
2949 self._set_non_blocking(w)
2950
2951 # To exercise all code paths in the C implementation we need
2952 # to play with buffer sizes. For instance, if we choose a
2953 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2954 # then we will never get a partial write of the buffer.
2955 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2956 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2957
2958 with rf, wf:
2959 for N in 9999, 73, 7574:
2960 try:
2961 i = 0
2962 while True:
2963 msg = bytes([i % 26 + 97]) * N
2964 sent.append(msg)
2965 wf.write(msg)
2966 i += 1
2967
2968 except self.BlockingIOError as e:
2969 self.assertEqual(e.args[0], errno.EAGAIN)
2970 sent[-1] = sent[-1][:e.characters_written]
2971 received.append(rf.read())
2972 msg = b'BLOCKED'
2973 wf.write(msg)
2974 sent.append(msg)
2975
2976 while True:
2977 try:
2978 wf.flush()
2979 break
2980 except self.BlockingIOError as e:
2981 self.assertEqual(e.args[0], errno.EAGAIN)
2982 self.assertEqual(e.characters_written, 0)
2983 received.append(rf.read())
2984
2985 received += iter(rf.read, None)
2986
2987 sent, received = b''.join(sent), b''.join(received)
2988 self.assertTrue(sent == received)
2989 self.assertTrue(wf.closed)
2990 self.assertTrue(rf.closed)
2991
Antoine Pitrou19690592009-06-12 20:14:08 +00002992class CMiscIOTest(MiscIOTest):
2993 io = io
2994
2995class PyMiscIOTest(MiscIOTest):
2996 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002997
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002998
2999@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3000class SignalsTest(unittest.TestCase):
3001
3002 def setUp(self):
3003 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3004
3005 def tearDown(self):
3006 signal.signal(signal.SIGALRM, self.oldalrm)
3007
3008 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00003009 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003010
3011 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinner49d495f2011-07-04 11:44:46 +02003012 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
3013 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003014 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3015 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou6439c002011-02-25 21:35:47 +00003016 invokes the signal handler, and bubbles up the exception raised
3017 in the latter."""
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003018 read_results = []
3019 def _read():
3020 s = os.read(r, 1)
3021 read_results.append(s)
3022 t = threading.Thread(target=_read)
3023 t.daemon = True
3024 r, w = os.pipe()
3025 try:
3026 wio = self.io.open(w, **fdopen_kwargs)
3027 t.start()
3028 signal.alarm(1)
3029 # Fill the pipe enough that the write will be blocking.
3030 # It will be interrupted by the timer armed above. Since the
3031 # other thread has read one byte, the low-level write will
3032 # return with a successful (partial) result rather than an EINTR.
3033 # The buffered IO layer must check for pending signal
3034 # handlers, which in this case will invoke alarm_interrupt().
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03003035 try:
3036 with self.assertRaises(ZeroDivisionError):
3037 wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1))
3038 finally:
3039 t.join()
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003040 # We got one byte, get another one and check that it isn't a
3041 # repeat of the first one.
3042 read_results.append(os.read(r, 1))
3043 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3044 finally:
3045 os.close(w)
3046 os.close(r)
3047 # This is deliberate. If we didn't close the file descriptor
3048 # before closing wio, wio would try to flush its internal
3049 # buffer, and block again.
3050 try:
3051 wio.close()
3052 except IOError as e:
3053 if e.errno != errno.EBADF:
3054 raise
3055
3056 def test_interrupted_write_unbuffered(self):
3057 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3058
3059 def test_interrupted_write_buffered(self):
3060 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3061
3062 def test_interrupted_write_text(self):
3063 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3064
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003065 def check_reentrant_write(self, data, **fdopen_kwargs):
3066 def on_alarm(*args):
3067 # Will be called reentrantly from the same thread
3068 wio.write(data)
Victor Stinner4c41f842011-07-05 13:29:26 +02003069 1//0
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003070 signal.signal(signal.SIGALRM, on_alarm)
3071 r, w = os.pipe()
3072 wio = self.io.open(w, **fdopen_kwargs)
3073 try:
3074 signal.alarm(1)
3075 # Either the reentrant call to wio.write() fails with RuntimeError,
3076 # or the signal handler raises ZeroDivisionError.
3077 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3078 while 1:
3079 for i in range(100):
3080 wio.write(data)
3081 wio.flush()
3082 # Make sure the buffer doesn't fill up and block further writes
3083 os.read(r, len(data) * 100)
3084 exc = cm.exception
3085 if isinstance(exc, RuntimeError):
3086 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3087 finally:
3088 wio.close()
3089 os.close(r)
3090
3091 def test_reentrant_write_buffered(self):
3092 self.check_reentrant_write(b"xy", mode="wb")
3093
3094 def test_reentrant_write_text(self):
3095 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3096
Antoine Pitrou6439c002011-02-25 21:35:47 +00003097 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3098 """Check that a buffered read, when it gets interrupted (either
3099 returning a partial result or EINTR), properly invokes the signal
3100 handler and retries if the latter returned successfully."""
3101 r, w = os.pipe()
3102 fdopen_kwargs["closefd"] = False
3103 def alarm_handler(sig, frame):
3104 os.write(w, b"bar")
3105 signal.signal(signal.SIGALRM, alarm_handler)
3106 try:
3107 rio = self.io.open(r, **fdopen_kwargs)
3108 os.write(w, b"foo")
3109 signal.alarm(1)
3110 # Expected behaviour:
3111 # - first raw read() returns partial b"foo"
3112 # - second raw read() returns EINTR
3113 # - third raw read() returns b"bar"
3114 self.assertEqual(decode(rio.read(6)), "foobar")
3115 finally:
3116 rio.close()
3117 os.close(w)
3118 os.close(r)
3119
3120 def test_interrupterd_read_retry_buffered(self):
3121 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3122 mode="rb")
3123
3124 def test_interrupterd_read_retry_text(self):
3125 self.check_interrupted_read_retry(lambda x: x,
3126 mode="r")
3127
3128 @unittest.skipUnless(threading, 'Threading required for this test.')
3129 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3130 """Check that a buffered write, when it gets interrupted (either
3131 returning a partial result or EINTR), properly invokes the signal
3132 handler and retries if the latter returned successfully."""
3133 select = support.import_module("select")
3134 # A quantity that exceeds the buffer size of an anonymous pipe's
3135 # write end.
Antoine Pitrou68915d72013-04-24 23:31:38 +02003136 N = support.PIPE_MAX_SIZE
Antoine Pitrou6439c002011-02-25 21:35:47 +00003137 r, w = os.pipe()
3138 fdopen_kwargs["closefd"] = False
3139 # We need a separate thread to read from the pipe and allow the
3140 # write() to finish. This thread is started after the SIGALRM is
3141 # received (forcing a first EINTR in write()).
3142 read_results = []
3143 write_finished = False
Serhiy Storchaka53ea1622015-03-28 20:38:48 +02003144 error = [None]
Antoine Pitrou6439c002011-02-25 21:35:47 +00003145 def _read():
Serhiy Storchaka53ea1622015-03-28 20:38:48 +02003146 try:
3147 while not write_finished:
3148 while r in select.select([r], [], [], 1.0)[0]:
3149 s = os.read(r, 1024)
3150 read_results.append(s)
3151 except BaseException as exc:
3152 error[0] = exc
Antoine Pitrou6439c002011-02-25 21:35:47 +00003153 t = threading.Thread(target=_read)
3154 t.daemon = True
3155 def alarm1(sig, frame):
3156 signal.signal(signal.SIGALRM, alarm2)
3157 signal.alarm(1)
3158 def alarm2(sig, frame):
3159 t.start()
3160 signal.signal(signal.SIGALRM, alarm1)
3161 try:
3162 wio = self.io.open(w, **fdopen_kwargs)
3163 signal.alarm(1)
3164 # Expected behaviour:
3165 # - first raw write() is partial (because of the limited pipe buffer
3166 # and the first alarm)
3167 # - second raw write() returns EINTR (because of the second alarm)
3168 # - subsequent write()s are successful (either partial or complete)
3169 self.assertEqual(N, wio.write(item * N))
3170 wio.flush()
3171 write_finished = True
3172 t.join()
Serhiy Storchaka53ea1622015-03-28 20:38:48 +02003173
3174 self.assertIsNone(error[0])
Antoine Pitrou6439c002011-02-25 21:35:47 +00003175 self.assertEqual(N, sum(len(x) for x in read_results))
3176 finally:
3177 write_finished = True
3178 os.close(w)
3179 os.close(r)
3180 # This is deliberate. If we didn't close the file descriptor
3181 # before closing wio, wio would try to flush its internal
3182 # buffer, and could block (in case of failure).
3183 try:
3184 wio.close()
3185 except IOError as e:
3186 if e.errno != errno.EBADF:
3187 raise
3188
3189 def test_interrupterd_write_retry_buffered(self):
3190 self.check_interrupted_write_retry(b"x", mode="wb")
3191
3192 def test_interrupterd_write_retry_text(self):
3193 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3194
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003195
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003196class CSignalsTest(SignalsTest):
3197 io = io
3198
3199class PySignalsTest(SignalsTest):
3200 io = pyio
3201
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003202 # Handling reentrancy issues would slow down _pyio even more, so the
3203 # tests are disabled.
3204 test_reentrant_write_buffered = None
3205 test_reentrant_write_text = None
3206
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003207
Christian Heimes1a6387e2008-03-26 12:49:49 +00003208def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00003209 tests = (CIOTest, PyIOTest,
3210 CBufferedReaderTest, PyBufferedReaderTest,
3211 CBufferedWriterTest, PyBufferedWriterTest,
3212 CBufferedRWPairTest, PyBufferedRWPairTest,
3213 CBufferedRandomTest, PyBufferedRandomTest,
3214 StatefulIncrementalDecoderTest,
3215 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3216 CTextIOWrapperTest, PyTextIOWrapperTest,
3217 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003218 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00003219 )
3220
3221 # Put the namespaces of the IO module we are testing and some useful mock
3222 # classes in the __dict__ of each test.
3223 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00003224 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00003225 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3226 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
3227 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
3228 globs = globals()
3229 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3230 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3231 # Avoid turning open into a bound method.
3232 py_io_ns["open"] = pyio.OpenWrapper
3233 for test in tests:
3234 if test.__name__.startswith("C"):
3235 for name, obj in c_io_ns.items():
3236 setattr(test, name, obj)
3237 elif test.__name__.startswith("Py"):
3238 for name, obj in py_io_ns.items():
3239 setattr(test, name, obj)
3240
3241 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00003242
3243if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00003244 test_main()