blob: fc68e4dbf83058dbb1f872e6ddf59c25149f32de [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Serhiy Storchaka05b0a1b2014-06-09 13:32:08 +030032import warnings
Antoine Pitrou19690592009-06-12 20:14:08 +000033import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000034import signal
35import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000036from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000037from collections import deque
Antoine Pitrou78e761e2012-10-16 22:57:11 +020038from UserList import UserList
Antoine Pitrou19690592009-06-12 20:14:08 +000039from test import test_support as support
Serhiy Storchaka354d50e2013-02-03 17:10:42 +020040import contextlib
Christian Heimes1a6387e2008-03-26 12:49:49 +000041
42import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000043import io # C implementation of io
44import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000045try:
46 import threading
47except ImportError:
48 threading = None
Antoine Pitrou5aa7df32011-11-21 20:16:44 +010049try:
50 import fcntl
51except ImportError:
52 fcntl = None
Antoine Pitrou19690592009-06-12 20:14:08 +000053
54__metaclass__ = type
55bytes = support.py3k_bytes
56
57def _default_chunk_size():
58 """Get the default TextIOWrapper chunk size"""
59 with io.open(__file__, "r", encoding="latin1") as f:
60 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000061
62
Antoine Pitrou6391b342010-09-14 18:48:19 +000063class MockRawIOWithoutRead:
64 """A RawIO implementation without read(), so as to exercise the default
65 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000066
67 def __init__(self, read_stack=()):
68 self._read_stack = list(read_stack)
69 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000070 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000071 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000072
Christian Heimes1a6387e2008-03-26 12:49:49 +000073 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000074 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000075 return len(b)
76
77 def writable(self):
78 return True
79
80 def fileno(self):
81 return 42
82
83 def readable(self):
84 return True
85
86 def seekable(self):
87 return True
88
89 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000090 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000091
92 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000093 return 0 # same comment as above
94
95 def readinto(self, buf):
96 self._reads += 1
97 max_len = len(buf)
98 try:
99 data = self._read_stack[0]
100 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000101 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +0000102 return 0
103 if data is None:
104 del self._read_stack[0]
105 return None
106 n = len(data)
107 if len(data) <= max_len:
108 del self._read_stack[0]
109 buf[:n] = data
110 return n
111 else:
112 buf[:] = data[:max_len]
113 self._read_stack[0] = data[max_len:]
114 return max_len
115
116 def truncate(self, pos=None):
117 return pos
118
Antoine Pitrou6391b342010-09-14 18:48:19 +0000119class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
120 pass
121
122class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
123 pass
124
125
126class MockRawIO(MockRawIOWithoutRead):
127
128 def read(self, n=None):
129 self._reads += 1
130 try:
131 return self._read_stack.pop(0)
132 except:
133 self._extraneous_reads += 1
134 return b""
135
Antoine Pitrou19690592009-06-12 20:14:08 +0000136class CMockRawIO(MockRawIO, io.RawIOBase):
137 pass
138
139class PyMockRawIO(MockRawIO, pyio.RawIOBase):
140 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000141
142
Antoine Pitrou19690592009-06-12 20:14:08 +0000143class MisbehavedRawIO(MockRawIO):
144 def write(self, b):
145 return MockRawIO.write(self, b) * 2
146
147 def read(self, n=None):
148 return MockRawIO.read(self, n) * 2
149
150 def seek(self, pos, whence):
151 return -123
152
153 def tell(self):
154 return -456
155
156 def readinto(self, buf):
157 MockRawIO.readinto(self, buf)
158 return len(buf) * 5
159
160class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
161 pass
162
163class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
164 pass
165
166
167class CloseFailureIO(MockRawIO):
168 closed = 0
169
170 def close(self):
171 if not self.closed:
172 self.closed = 1
173 raise IOError
174
175class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
176 pass
177
178class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
179 pass
180
181
182class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000183
184 def __init__(self, data):
185 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000186 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000187
188 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000189 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000190 self.read_history.append(None if res is None else len(res))
191 return res
192
Antoine Pitrou19690592009-06-12 20:14:08 +0000193 def readinto(self, b):
194 res = super(MockFileIO, self).readinto(b)
195 self.read_history.append(res)
196 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000197
Antoine Pitrou19690592009-06-12 20:14:08 +0000198class CMockFileIO(MockFileIO, io.BytesIO):
199 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000200
Antoine Pitrou19690592009-06-12 20:14:08 +0000201class PyMockFileIO(MockFileIO, pyio.BytesIO):
202 pass
203
204
205class MockNonBlockWriterIO:
206
207 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000208 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000209 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000210
Antoine Pitrou19690592009-06-12 20:14:08 +0000211 def pop_written(self):
212 s = b"".join(self._write_stack)
213 self._write_stack[:] = []
214 return s
215
216 def block_on(self, char):
217 """Block when a given char is encountered."""
218 self._blocker_char = char
219
220 def readable(self):
221 return True
222
223 def seekable(self):
224 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000225
226 def writable(self):
227 return True
228
Antoine Pitrou19690592009-06-12 20:14:08 +0000229 def write(self, b):
230 b = bytes(b)
231 n = -1
232 if self._blocker_char:
233 try:
234 n = b.index(self._blocker_char)
235 except ValueError:
236 pass
237 else:
Antoine Pitrou5aa7df32011-11-21 20:16:44 +0100238 if n > 0:
239 # write data up to the first blocker
240 self._write_stack.append(b[:n])
241 return n
242 else:
243 # cancel blocker and indicate would block
244 self._blocker_char = None
245 return None
Antoine Pitrou19690592009-06-12 20:14:08 +0000246 self._write_stack.append(b)
247 return len(b)
248
249class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
250 BlockingIOError = io.BlockingIOError
251
252class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
253 BlockingIOError = pyio.BlockingIOError
254
Christian Heimes1a6387e2008-03-26 12:49:49 +0000255
256class IOTest(unittest.TestCase):
257
Antoine Pitrou19690592009-06-12 20:14:08 +0000258 def setUp(self):
259 support.unlink(support.TESTFN)
260
Christian Heimes1a6387e2008-03-26 12:49:49 +0000261 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000262 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000263
264 def write_ops(self, f):
265 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000266 f.truncate(0)
267 self.assertEqual(f.tell(), 5)
268 f.seek(0)
269
270 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000271 self.assertEqual(f.seek(0), 0)
272 self.assertEqual(f.write(b"Hello."), 6)
273 self.assertEqual(f.tell(), 6)
274 self.assertEqual(f.seek(-1, 1), 5)
275 self.assertEqual(f.tell(), 5)
276 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
277 self.assertEqual(f.seek(0), 0)
278 self.assertEqual(f.write(b"h"), 1)
279 self.assertEqual(f.seek(-1, 2), 13)
280 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000281
Christian Heimes1a6387e2008-03-26 12:49:49 +0000282 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000283 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000284 self.assertRaises(TypeError, f.seek, 0.0)
285
286 def read_ops(self, f, buffered=False):
287 data = f.read(5)
288 self.assertEqual(data, b"hello")
289 data = bytearray(data)
290 self.assertEqual(f.readinto(data), 5)
291 self.assertEqual(data, b" worl")
292 self.assertEqual(f.readinto(data), 2)
293 self.assertEqual(len(data), 5)
294 self.assertEqual(data[:2], b"d\n")
295 self.assertEqual(f.seek(0), 0)
296 self.assertEqual(f.read(20), b"hello world\n")
297 self.assertEqual(f.read(1), b"")
298 self.assertEqual(f.readinto(bytearray(b"x")), 0)
299 self.assertEqual(f.seek(-6, 2), 6)
300 self.assertEqual(f.read(5), b"world")
301 self.assertEqual(f.read(0), b"")
302 self.assertEqual(f.readinto(bytearray()), 0)
303 self.assertEqual(f.seek(-6, 1), 5)
304 self.assertEqual(f.read(5), b" worl")
305 self.assertEqual(f.tell(), 10)
306 self.assertRaises(TypeError, f.seek, 0.0)
307 if buffered:
308 f.seek(0)
309 self.assertEqual(f.read(), b"hello world\n")
310 f.seek(6)
311 self.assertEqual(f.read(), b"world\n")
312 self.assertEqual(f.read(), b"")
313
314 LARGE = 2**31
315
316 def large_file_ops(self, f):
317 assert f.readable()
318 assert f.writable()
319 self.assertEqual(f.seek(self.LARGE), self.LARGE)
320 self.assertEqual(f.tell(), self.LARGE)
321 self.assertEqual(f.write(b"xxx"), 3)
322 self.assertEqual(f.tell(), self.LARGE + 3)
323 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
324 self.assertEqual(f.truncate(), self.LARGE + 2)
325 self.assertEqual(f.tell(), self.LARGE + 2)
326 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
327 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000328 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000329 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
330 self.assertEqual(f.seek(-1, 2), self.LARGE)
331 self.assertEqual(f.read(2), b"x")
332
Antoine Pitrou19690592009-06-12 20:14:08 +0000333 def test_invalid_operations(self):
334 # Try writing on a file opened in read mode and vice-versa.
335 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000336 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000337 self.assertRaises(IOError, fp.read)
338 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000339 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000340 self.assertRaises(IOError, fp.write, b"blah")
341 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000342 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000343 self.assertRaises(IOError, fp.write, "blah")
344 self.assertRaises(IOError, fp.writelines, ["blah\n"])
345
Christian Heimes1a6387e2008-03-26 12:49:49 +0000346 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000347 with self.open(support.TESTFN, "wb", buffering=0) as f:
348 self.assertEqual(f.readable(), False)
349 self.assertEqual(f.writable(), True)
350 self.assertEqual(f.seekable(), True)
351 self.write_ops(f)
352 with self.open(support.TESTFN, "rb", buffering=0) as f:
353 self.assertEqual(f.readable(), True)
354 self.assertEqual(f.writable(), False)
355 self.assertEqual(f.seekable(), True)
356 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000357
358 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000359 with self.open(support.TESTFN, "wb") as f:
360 self.assertEqual(f.readable(), False)
361 self.assertEqual(f.writable(), True)
362 self.assertEqual(f.seekable(), True)
363 self.write_ops(f)
364 with self.open(support.TESTFN, "rb") as f:
365 self.assertEqual(f.readable(), True)
366 self.assertEqual(f.writable(), False)
367 self.assertEqual(f.seekable(), True)
368 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000369
370 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000371 with self.open(support.TESTFN, "wb") as f:
372 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
373 with self.open(support.TESTFN, "rb") as f:
374 self.assertEqual(f.readline(), b"abc\n")
375 self.assertEqual(f.readline(10), b"def\n")
376 self.assertEqual(f.readline(2), b"xy")
377 self.assertEqual(f.readline(4), b"zzy\n")
378 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000379 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000380 self.assertRaises(TypeError, f.readline, 5.3)
381 with self.open(support.TESTFN, "r") as f:
382 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000383
384 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000385 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000386 self.write_ops(f)
387 data = f.getvalue()
388 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000389 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000390 self.read_ops(f, True)
391
392 def test_large_file_ops(self):
393 # On Windows and Mac OSX this test comsumes large resources; It takes
394 # a long time to build the >2GB file and takes >2GB of disk space
395 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000396 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware1f702212013-12-10 14:09:20 -0600397 support.requires(
398 'largefile',
399 'test requires %s bytes and a long time to run' % self.LARGE)
Antoine Pitrou19690592009-06-12 20:14:08 +0000400 with self.open(support.TESTFN, "w+b", 0) as f:
401 self.large_file_ops(f)
402 with self.open(support.TESTFN, "w+b") as f:
403 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000404
405 def test_with_open(self):
406 for bufsize in (0, 1, 100):
407 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000408 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000409 f.write(b"xxx")
410 self.assertEqual(f.closed, True)
411 f = None
412 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000413 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000414 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000415 except ZeroDivisionError:
416 self.assertEqual(f.closed, True)
417 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000418 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000419
Antoine Pitroue741cc62009-01-21 00:45:36 +0000420 # issue 5008
421 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000422 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000423 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000424 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000425 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000426 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000427 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000428 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000429 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000430
Christian Heimes1a6387e2008-03-26 12:49:49 +0000431 def test_destructor(self):
432 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000433 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000434 def __del__(self):
435 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000436 try:
437 f = super(MyFileIO, self).__del__
438 except AttributeError:
439 pass
440 else:
441 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000442 def close(self):
443 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000444 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000445 def flush(self):
446 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000447 super(MyFileIO, self).flush()
448 f = MyFileIO(support.TESTFN, "wb")
449 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000450 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000451 support.gc_collect()
452 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000453 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000454 self.assertEqual(f.read(), b"xxx")
455
456 def _check_base_destructor(self, base):
457 record = []
458 class MyIO(base):
459 def __init__(self):
460 # This exercises the availability of attributes on object
461 # destruction.
462 # (in the C version, close() is called by the tp_dealloc
463 # function, not by __del__)
464 self.on_del = 1
465 self.on_close = 2
466 self.on_flush = 3
467 def __del__(self):
468 record.append(self.on_del)
469 try:
470 f = super(MyIO, self).__del__
471 except AttributeError:
472 pass
473 else:
474 f()
475 def close(self):
476 record.append(self.on_close)
477 super(MyIO, self).close()
478 def flush(self):
479 record.append(self.on_flush)
480 super(MyIO, self).flush()
481 f = MyIO()
482 del f
483 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000484 self.assertEqual(record, [1, 2, 3])
485
Antoine Pitrou19690592009-06-12 20:14:08 +0000486 def test_IOBase_destructor(self):
487 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000488
Antoine Pitrou19690592009-06-12 20:14:08 +0000489 def test_RawIOBase_destructor(self):
490 self._check_base_destructor(self.RawIOBase)
491
492 def test_BufferedIOBase_destructor(self):
493 self._check_base_destructor(self.BufferedIOBase)
494
495 def test_TextIOBase_destructor(self):
496 self._check_base_destructor(self.TextIOBase)
497
498 def test_close_flushes(self):
499 with self.open(support.TESTFN, "wb") as f:
500 f.write(b"xxx")
501 with self.open(support.TESTFN, "rb") as f:
502 self.assertEqual(f.read(), b"xxx")
503
504 def test_array_writes(self):
505 a = array.array(b'i', range(10))
506 n = len(a.tostring())
507 with self.open(support.TESTFN, "wb", 0) as f:
508 self.assertEqual(f.write(a), n)
509 with self.open(support.TESTFN, "wb") as f:
510 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000511
512 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000513 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000514 closefd=False)
515
Antoine Pitrou19690592009-06-12 20:14:08 +0000516 def test_read_closed(self):
517 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000518 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000519 with self.open(support.TESTFN, "r") as f:
520 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000521 self.assertEqual(file.read(), "egg\n")
522 file.seek(0)
523 file.close()
524 self.assertRaises(ValueError, file.read)
525
526 def test_no_closefd_with_filename(self):
527 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000528 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000529
530 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000531 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000532 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000533 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000534 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000535 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000536 self.assertEqual(file.buffer.raw.closefd, False)
537
Antoine Pitrou19690592009-06-12 20:14:08 +0000538 def test_garbage_collection(self):
539 # FileIO objects are collected, and collecting them flushes
540 # all data to disk.
541 f = self.FileIO(support.TESTFN, "wb")
542 f.write(b"abcxxx")
543 f.f = f
544 wr = weakref.ref(f)
545 del f
546 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000547 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000548 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000549 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000550
Antoine Pitrou19690592009-06-12 20:14:08 +0000551 def test_unbounded_file(self):
552 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
553 zero = "/dev/zero"
554 if not os.path.exists(zero):
555 self.skipTest("{0} does not exist".format(zero))
556 if sys.maxsize > 0x7FFFFFFF:
557 self.skipTest("test can only run in a 32-bit address space")
558 if support.real_max_memuse < support._2G:
559 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000560 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000561 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000562 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000563 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000564 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000565 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000566
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200567 def check_flush_error_on_close(self, *args, **kwargs):
568 # Test that the file is closed despite failed flush
569 # and that flush() is called before file closed.
570 f = self.open(*args, **kwargs)
571 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000572 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200573 closed[:] = [f.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000574 raise IOError()
575 f.flush = bad_flush
576 self.assertRaises(IOError, f.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600577 self.assertTrue(f.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200578 self.assertTrue(closed) # flush() called
579 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchaka437d5352015-02-23 00:28:38 +0200580 f.flush = lambda: None # break reference loop
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200581
582 def test_flush_error_on_close(self):
583 # raw file
584 # Issue #5700: io.FileIO calls flush() after file closed
585 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
586 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
587 self.check_flush_error_on_close(fd, 'wb', buffering=0)
588 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
589 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
590 os.close(fd)
591 # buffered io
592 self.check_flush_error_on_close(support.TESTFN, 'wb')
593 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
594 self.check_flush_error_on_close(fd, 'wb')
595 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
596 self.check_flush_error_on_close(fd, 'wb', closefd=False)
597 os.close(fd)
598 # text io
599 self.check_flush_error_on_close(support.TESTFN, 'w')
600 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
601 self.check_flush_error_on_close(fd, 'w')
602 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
603 self.check_flush_error_on_close(fd, 'w', closefd=False)
604 os.close(fd)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000605
606 def test_multi_close(self):
607 f = self.open(support.TESTFN, "wb", buffering=0)
608 f.close()
609 f.close()
610 f.close()
611 self.assertRaises(ValueError, f.flush)
612
Antoine Pitrou6391b342010-09-14 18:48:19 +0000613 def test_RawIOBase_read(self):
614 # Exercise the default RawIOBase.read() implementation (which calls
615 # readinto() internally).
616 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
617 self.assertEqual(rawio.read(2), b"ab")
618 self.assertEqual(rawio.read(2), b"c")
619 self.assertEqual(rawio.read(2), b"d")
620 self.assertEqual(rawio.read(2), None)
621 self.assertEqual(rawio.read(2), b"ef")
622 self.assertEqual(rawio.read(2), b"g")
623 self.assertEqual(rawio.read(2), None)
624 self.assertEqual(rawio.read(2), b"")
625
Hynek Schlawack877effc2012-05-25 09:24:18 +0200626 def test_fileio_closefd(self):
627 # Issue #4841
628 with self.open(__file__, 'rb') as f1, \
629 self.open(__file__, 'rb') as f2:
630 fileio = self.FileIO(f1.fileno(), closefd=False)
631 # .__init__() must not close f1
632 fileio.__init__(f2.fileno(), closefd=False)
633 f1.readline()
634 # .close() must not close f2
635 fileio.close()
636 f2.readline()
637
Serhiy Storchaka05b0a1b2014-06-09 13:32:08 +0300638 def test_nonbuffered_textio(self):
639 with warnings.catch_warnings(record=True) as recorded:
640 with self.assertRaises(ValueError):
641 self.open(support.TESTFN, 'w', buffering=0)
642 support.gc_collect()
643 self.assertEqual(recorded, [])
644
645 def test_invalid_newline(self):
646 with warnings.catch_warnings(record=True) as recorded:
647 with self.assertRaises(ValueError):
648 self.open(support.TESTFN, 'w', newline='invalid')
649 support.gc_collect()
650 self.assertEqual(recorded, [])
651
Hynek Schlawack877effc2012-05-25 09:24:18 +0200652
Antoine Pitrou19690592009-06-12 20:14:08 +0000653class CIOTest(IOTest):
Antoine Pitrou16166452011-07-12 22:04:20 +0200654
655 def test_IOBase_finalize(self):
656 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
657 # class which inherits IOBase and an object of this class are caught
658 # in a reference cycle and close() is already in the method cache.
659 class MyIO(self.IOBase):
660 def close(self):
661 pass
662
663 # create an instance to populate the method cache
664 MyIO()
665 obj = MyIO()
666 obj.obj = obj
667 wr = weakref.ref(obj)
668 del MyIO
669 del obj
670 support.gc_collect()
671 self.assertTrue(wr() is None, wr)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000672
Antoine Pitrou19690592009-06-12 20:14:08 +0000673class PyIOTest(IOTest):
674 test_array_writes = unittest.skip(
675 "len(array.array) returns number of elements rather than bytelength"
676 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000677
678
Antoine Pitrou19690592009-06-12 20:14:08 +0000679class CommonBufferedTests:
680 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
681
682 def test_detach(self):
683 raw = self.MockRawIO()
684 buf = self.tp(raw)
685 self.assertIs(buf.detach(), raw)
686 self.assertRaises(ValueError, buf.detach)
687
Benjamin Peterson53ae6142014-12-21 20:51:50 -0600688 repr(buf) # Should still work
689
Antoine Pitrou19690592009-06-12 20:14:08 +0000690 def test_fileno(self):
691 rawio = self.MockRawIO()
692 bufio = self.tp(rawio)
693
Ezio Melotti2623a372010-11-21 13:34:58 +0000694 self.assertEqual(42, bufio.fileno())
Antoine Pitrou19690592009-06-12 20:14:08 +0000695
Zachary Ware1f702212013-12-10 14:09:20 -0600696 @unittest.skip('test having existential crisis')
Antoine Pitrou19690592009-06-12 20:14:08 +0000697 def test_no_fileno(self):
698 # XXX will we always have fileno() function? If so, kill
699 # this test. Else, write it.
700 pass
701
702 def test_invalid_args(self):
703 rawio = self.MockRawIO()
704 bufio = self.tp(rawio)
705 # Invalid whence
706 self.assertRaises(ValueError, bufio.seek, 0, -1)
707 self.assertRaises(ValueError, bufio.seek, 0, 3)
708
709 def test_override_destructor(self):
710 tp = self.tp
711 record = []
712 class MyBufferedIO(tp):
713 def __del__(self):
714 record.append(1)
715 try:
716 f = super(MyBufferedIO, self).__del__
717 except AttributeError:
718 pass
719 else:
720 f()
721 def close(self):
722 record.append(2)
723 super(MyBufferedIO, self).close()
724 def flush(self):
725 record.append(3)
726 super(MyBufferedIO, self).flush()
727 rawio = self.MockRawIO()
728 bufio = MyBufferedIO(rawio)
729 writable = bufio.writable()
730 del bufio
731 support.gc_collect()
732 if writable:
733 self.assertEqual(record, [1, 2, 3])
734 else:
735 self.assertEqual(record, [1, 2])
736
737 def test_context_manager(self):
738 # Test usability as a context manager
739 rawio = self.MockRawIO()
740 bufio = self.tp(rawio)
741 def _with():
742 with bufio:
743 pass
744 _with()
745 # bufio should now be closed, and using it a second time should raise
746 # a ValueError.
747 self.assertRaises(ValueError, _with)
748
749 def test_error_through_destructor(self):
750 # Test that the exception state is not modified by a destructor,
751 # even if close() fails.
752 rawio = self.CloseFailureIO()
753 def f():
754 self.tp(rawio).xyzzy
755 with support.captured_output("stderr") as s:
756 self.assertRaises(AttributeError, f)
757 s = s.getvalue().strip()
758 if s:
759 # The destructor *may* have printed an unraisable error, check it
760 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000761 self.assertTrue(s.startswith("Exception IOError: "), s)
762 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000763
764 def test_repr(self):
765 raw = self.MockRawIO()
766 b = self.tp(raw)
767 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
768 self.assertEqual(repr(b), "<%s>" % clsname)
769 raw.name = "dummy"
770 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
771 raw.name = b"dummy"
772 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000773
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000774 def test_flush_error_on_close(self):
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200775 # Test that buffered file is closed despite failed flush
776 # and that flush() is called before file closed.
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000777 raw = self.MockRawIO()
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200778 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000779 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200780 closed[:] = [b.closed, raw.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000781 raise IOError()
782 raw.flush = bad_flush
783 b = self.tp(raw)
784 self.assertRaises(IOError, b.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600785 self.assertTrue(b.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200786 self.assertTrue(raw.closed)
787 self.assertTrue(closed) # flush() called
788 self.assertFalse(closed[0]) # flush() called before file closed
789 self.assertFalse(closed[1])
Serhiy Storchaka437d5352015-02-23 00:28:38 +0200790 raw.flush = lambda: None # break reference loop
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600791
792 def test_close_error_on_close(self):
793 raw = self.MockRawIO()
794 def bad_flush():
795 raise IOError('flush')
796 def bad_close():
797 raise IOError('close')
798 raw.close = bad_close
799 b = self.tp(raw)
800 b.flush = bad_flush
801 with self.assertRaises(IOError) as err: # exception not swallowed
802 b.close()
803 self.assertEqual(err.exception.args, ('close',))
804 self.assertFalse(b.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000805
806 def test_multi_close(self):
807 raw = self.MockRawIO()
808 b = self.tp(raw)
809 b.close()
810 b.close()
811 b.close()
812 self.assertRaises(ValueError, b.flush)
813
Antoine Pitroufc9ead62010-12-21 21:26:55 +0000814 def test_readonly_attributes(self):
815 raw = self.MockRawIO()
816 buf = self.tp(raw)
817 x = self.MockRawIO()
818 with self.assertRaises((AttributeError, TypeError)):
819 buf.raw = x
820
Christian Heimes1a6387e2008-03-26 12:49:49 +0000821
Antoine Pitroubff5df02012-07-29 19:02:46 +0200822class SizeofTest:
823
824 @support.cpython_only
825 def test_sizeof(self):
826 bufsize1 = 4096
827 bufsize2 = 8192
828 rawio = self.MockRawIO()
829 bufio = self.tp(rawio, buffer_size=bufsize1)
830 size = sys.getsizeof(bufio) - bufsize1
831 rawio = self.MockRawIO()
832 bufio = self.tp(rawio, buffer_size=bufsize2)
833 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
834
835
Antoine Pitrou19690592009-06-12 20:14:08 +0000836class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
837 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000838
Antoine Pitrou19690592009-06-12 20:14:08 +0000839 def test_constructor(self):
840 rawio = self.MockRawIO([b"abc"])
841 bufio = self.tp(rawio)
842 bufio.__init__(rawio)
843 bufio.__init__(rawio, buffer_size=1024)
844 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000845 self.assertEqual(b"abc", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000846 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
847 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
848 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
849 rawio = self.MockRawIO([b"abc"])
850 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000851 self.assertEqual(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000852
Serhiy Storchaka1d19f972014-02-12 10:52:07 +0200853 def test_uninitialized(self):
854 bufio = self.tp.__new__(self.tp)
855 del bufio
856 bufio = self.tp.__new__(self.tp)
857 self.assertRaisesRegexp((ValueError, AttributeError),
858 'uninitialized|has no attribute',
859 bufio.read, 0)
860 bufio.__init__(self.MockRawIO())
861 self.assertEqual(bufio.read(0), b'')
862
Antoine Pitrou19690592009-06-12 20:14:08 +0000863 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000864 for arg in (None, 7):
865 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
866 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000867 self.assertEqual(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000868 # Invalid args
869 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000870
Antoine Pitrou19690592009-06-12 20:14:08 +0000871 def test_read1(self):
872 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
873 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000874 self.assertEqual(b"a", bufio.read(1))
875 self.assertEqual(b"b", bufio.read1(1))
876 self.assertEqual(rawio._reads, 1)
877 self.assertEqual(b"c", bufio.read1(100))
878 self.assertEqual(rawio._reads, 1)
879 self.assertEqual(b"d", bufio.read1(100))
880 self.assertEqual(rawio._reads, 2)
881 self.assertEqual(b"efg", bufio.read1(100))
882 self.assertEqual(rawio._reads, 3)
883 self.assertEqual(b"", bufio.read1(100))
884 self.assertEqual(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000885 # Invalid args
886 self.assertRaises(ValueError, bufio.read1, -1)
887
888 def test_readinto(self):
889 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
890 bufio = self.tp(rawio)
891 b = bytearray(2)
Ezio Melotti2623a372010-11-21 13:34:58 +0000892 self.assertEqual(bufio.readinto(b), 2)
893 self.assertEqual(b, b"ab")
894 self.assertEqual(bufio.readinto(b), 2)
895 self.assertEqual(b, b"cd")
896 self.assertEqual(bufio.readinto(b), 2)
897 self.assertEqual(b, b"ef")
898 self.assertEqual(bufio.readinto(b), 1)
899 self.assertEqual(b, b"gf")
900 self.assertEqual(bufio.readinto(b), 0)
901 self.assertEqual(b, b"gf")
Antoine Pitrou19690592009-06-12 20:14:08 +0000902
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000903 def test_readlines(self):
904 def bufio():
905 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
906 return self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000907 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
908 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
909 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000910
Antoine Pitrou19690592009-06-12 20:14:08 +0000911 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000912 data = b"abcdefghi"
913 dlen = len(data)
914
915 tests = [
916 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
917 [ 100, [ 3, 3, 3], [ dlen ] ],
918 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
919 ]
920
921 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000922 rawio = self.MockFileIO(data)
923 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000924 pos = 0
925 for nbytes in buf_read_sizes:
Ezio Melotti2623a372010-11-21 13:34:58 +0000926 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Christian Heimes1a6387e2008-03-26 12:49:49 +0000927 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000928 # this is mildly implementation-dependent
Ezio Melotti2623a372010-11-21 13:34:58 +0000929 self.assertEqual(rawio.read_history, raw_read_sizes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000930
Antoine Pitrou19690592009-06-12 20:14:08 +0000931 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000932 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000933 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
934 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000935 self.assertEqual(b"abcd", bufio.read(6))
936 self.assertEqual(b"e", bufio.read(1))
937 self.assertEqual(b"fg", bufio.read())
938 self.assertEqual(b"", bufio.peek(1))
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200939 self.assertIsNone(bufio.read())
Ezio Melotti2623a372010-11-21 13:34:58 +0000940 self.assertEqual(b"", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000941
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200942 rawio = self.MockRawIO((b"a", None, None))
943 self.assertEqual(b"a", rawio.readall())
944 self.assertIsNone(rawio.readall())
945
Antoine Pitrou19690592009-06-12 20:14:08 +0000946 def test_read_past_eof(self):
947 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
948 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000949
Ezio Melotti2623a372010-11-21 13:34:58 +0000950 self.assertEqual(b"abcdefg", bufio.read(9000))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000951
Antoine Pitrou19690592009-06-12 20:14:08 +0000952 def test_read_all(self):
953 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
954 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000955
Ezio Melotti2623a372010-11-21 13:34:58 +0000956 self.assertEqual(b"abcdefg", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000957
Victor Stinner6a102812010-04-27 23:55:59 +0000958 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +0000959 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +0000960 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000961 try:
962 # Write out many bytes with exactly the same number of 0's,
963 # 1's... 255's. This will help us check that concurrent reading
964 # doesn't duplicate or forget contents.
965 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000966 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000967 random.shuffle(l)
968 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000969 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000970 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000971 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000972 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000973 errors = []
974 results = []
975 def f():
976 try:
977 # Intra-buffer read then buffer-flushing read
978 for n in cycle([1, 19]):
979 s = bufio.read(n)
980 if not s:
981 break
982 # list.append() is atomic
983 results.append(s)
984 except Exception as e:
985 errors.append(e)
986 raise
987 threads = [threading.Thread(target=f) for x in range(20)]
988 for t in threads:
989 t.start()
990 time.sleep(0.02) # yield
991 for t in threads:
992 t.join()
993 self.assertFalse(errors,
994 "the following exceptions were caught: %r" % errors)
995 s = b''.join(results)
996 for i in range(256):
997 c = bytes(bytearray([i]))
998 self.assertEqual(s.count(c), N)
999 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001000 support.unlink(support.TESTFN)
1001
1002 def test_misbehaved_io(self):
1003 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1004 bufio = self.tp(rawio)
1005 self.assertRaises(IOError, bufio.seek, 0)
1006 self.assertRaises(IOError, bufio.tell)
1007
Antoine Pitroucb4f47c2010-08-11 13:40:17 +00001008 def test_no_extraneous_read(self):
1009 # Issue #9550; when the raw IO object has satisfied the read request,
1010 # we should not issue any additional reads, otherwise it may block
1011 # (e.g. socket).
1012 bufsize = 16
1013 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1014 rawio = self.MockRawIO([b"x" * n])
1015 bufio = self.tp(rawio, bufsize)
1016 self.assertEqual(bufio.read(n), b"x" * n)
1017 # Simple case: one raw read is enough to satisfy the request.
1018 self.assertEqual(rawio._extraneous_reads, 0,
1019 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1020 # A more complex case where two raw reads are needed to satisfy
1021 # the request.
1022 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1023 bufio = self.tp(rawio, bufsize)
1024 self.assertEqual(bufio.read(n), b"x" * n)
1025 self.assertEqual(rawio._extraneous_reads, 0,
1026 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1027
1028
Antoine Pitroubff5df02012-07-29 19:02:46 +02001029class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001030 tp = io.BufferedReader
1031
1032 def test_constructor(self):
1033 BufferedReaderTest.test_constructor(self)
1034 # The allocation can succeed on 32-bit builds, e.g. with more
1035 # than 2GB RAM and a 64-bit kernel.
1036 if sys.maxsize > 0x7FFFFFFF:
1037 rawio = self.MockRawIO()
1038 bufio = self.tp(rawio)
1039 self.assertRaises((OverflowError, MemoryError, ValueError),
1040 bufio.__init__, rawio, sys.maxsize)
1041
1042 def test_initialization(self):
1043 rawio = self.MockRawIO([b"abc"])
1044 bufio = self.tp(rawio)
1045 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1046 self.assertRaises(ValueError, bufio.read)
1047 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1048 self.assertRaises(ValueError, bufio.read)
1049 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1050 self.assertRaises(ValueError, bufio.read)
1051
1052 def test_misbehaved_io_read(self):
1053 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1054 bufio = self.tp(rawio)
1055 # _pyio.BufferedReader seems to implement reading different, so that
1056 # checking this is not so easy.
1057 self.assertRaises(IOError, bufio.read, 10)
1058
1059 def test_garbage_collection(self):
1060 # C BufferedReader objects are collected.
1061 # The Python version has __del__, so it ends into gc.garbage instead
1062 rawio = self.FileIO(support.TESTFN, "w+b")
1063 f = self.tp(rawio)
1064 f.f = f
1065 wr = weakref.ref(f)
1066 del f
1067 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001068 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00001069
R David Murray5b2cf5e2013-02-23 22:11:21 -05001070 def test_args_error(self):
1071 # Issue #17275
1072 with self.assertRaisesRegexp(TypeError, "BufferedReader"):
1073 self.tp(io.BytesIO(), 1024, 1024, 1024)
1074
1075
Antoine Pitrou19690592009-06-12 20:14:08 +00001076class PyBufferedReaderTest(BufferedReaderTest):
1077 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001078
1079
Antoine Pitrou19690592009-06-12 20:14:08 +00001080class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1081 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001082
Antoine Pitrou19690592009-06-12 20:14:08 +00001083 def test_constructor(self):
1084 rawio = self.MockRawIO()
1085 bufio = self.tp(rawio)
1086 bufio.__init__(rawio)
1087 bufio.__init__(rawio, buffer_size=1024)
1088 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +00001089 self.assertEqual(3, bufio.write(b"abc"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001090 bufio.flush()
1091 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1092 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1093 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1094 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +00001095 self.assertEqual(3, bufio.write(b"ghi"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001096 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001097 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001098
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001099 def test_uninitialized(self):
1100 bufio = self.tp.__new__(self.tp)
1101 del bufio
1102 bufio = self.tp.__new__(self.tp)
1103 self.assertRaisesRegexp((ValueError, AttributeError),
1104 'uninitialized|has no attribute',
1105 bufio.write, b'')
1106 bufio.__init__(self.MockRawIO())
1107 self.assertEqual(bufio.write(b''), 0)
1108
Antoine Pitrou19690592009-06-12 20:14:08 +00001109 def test_detach_flush(self):
1110 raw = self.MockRawIO()
1111 buf = self.tp(raw)
1112 buf.write(b"howdy!")
1113 self.assertFalse(raw._write_stack)
1114 buf.detach()
1115 self.assertEqual(raw._write_stack, [b"howdy!"])
1116
1117 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001118 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +00001119 writer = self.MockRawIO()
1120 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001121 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001122 self.assertFalse(writer._write_stack)
1123
Antoine Pitrou19690592009-06-12 20:14:08 +00001124 def test_write_overflow(self):
1125 writer = self.MockRawIO()
1126 bufio = self.tp(writer, 8)
1127 contents = b"abcdefghijklmnop"
1128 for n in range(0, len(contents), 3):
1129 bufio.write(contents[n:n+3])
1130 flushed = b"".join(writer._write_stack)
1131 # At least (total - 8) bytes were implicitly flushed, perhaps more
1132 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001133 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001134
Antoine Pitrou19690592009-06-12 20:14:08 +00001135 def check_writes(self, intermediate_func):
1136 # Lots of writes, test the flushed output is as expected.
1137 contents = bytes(range(256)) * 1000
1138 n = 0
1139 writer = self.MockRawIO()
1140 bufio = self.tp(writer, 13)
1141 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1142 def gen_sizes():
1143 for size in count(1):
1144 for i in range(15):
1145 yield size
1146 sizes = gen_sizes()
1147 while n < len(contents):
1148 size = min(next(sizes), len(contents) - n)
Ezio Melotti2623a372010-11-21 13:34:58 +00001149 self.assertEqual(bufio.write(contents[n:n+size]), size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001150 intermediate_func(bufio)
1151 n += size
1152 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001153 self.assertEqual(contents,
Antoine Pitrou19690592009-06-12 20:14:08 +00001154 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001155
Antoine Pitrou19690592009-06-12 20:14:08 +00001156 def test_writes(self):
1157 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001158
Antoine Pitrou19690592009-06-12 20:14:08 +00001159 def test_writes_and_flushes(self):
1160 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001161
Antoine Pitrou19690592009-06-12 20:14:08 +00001162 def test_writes_and_seeks(self):
1163 def _seekabs(bufio):
1164 pos = bufio.tell()
1165 bufio.seek(pos + 1, 0)
1166 bufio.seek(pos - 1, 0)
1167 bufio.seek(pos, 0)
1168 self.check_writes(_seekabs)
1169 def _seekrel(bufio):
1170 pos = bufio.seek(0, 1)
1171 bufio.seek(+1, 1)
1172 bufio.seek(-1, 1)
1173 bufio.seek(pos, 0)
1174 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001175
Antoine Pitrou19690592009-06-12 20:14:08 +00001176 def test_writes_and_truncates(self):
1177 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001178
Antoine Pitrou19690592009-06-12 20:14:08 +00001179 def test_write_non_blocking(self):
1180 raw = self.MockNonBlockWriterIO()
1181 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001182
Ezio Melotti2623a372010-11-21 13:34:58 +00001183 self.assertEqual(bufio.write(b"abcd"), 4)
1184 self.assertEqual(bufio.write(b"efghi"), 5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001185 # 1 byte will be written, the rest will be buffered
1186 raw.block_on(b"k")
Ezio Melotti2623a372010-11-21 13:34:58 +00001187 self.assertEqual(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001188
Antoine Pitrou19690592009-06-12 20:14:08 +00001189 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1190 raw.block_on(b"0")
1191 try:
1192 bufio.write(b"opqrwxyz0123456789")
1193 except self.BlockingIOError as e:
1194 written = e.characters_written
1195 else:
1196 self.fail("BlockingIOError should have been raised")
Ezio Melotti2623a372010-11-21 13:34:58 +00001197 self.assertEqual(written, 16)
1198 self.assertEqual(raw.pop_written(),
Antoine Pitrou19690592009-06-12 20:14:08 +00001199 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001200
Ezio Melotti2623a372010-11-21 13:34:58 +00001201 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Antoine Pitrou19690592009-06-12 20:14:08 +00001202 s = raw.pop_written()
1203 # Previously buffered bytes were flushed
1204 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001205
Antoine Pitrou19690592009-06-12 20:14:08 +00001206 def test_write_and_rewind(self):
1207 raw = io.BytesIO()
1208 bufio = self.tp(raw, 4)
1209 self.assertEqual(bufio.write(b"abcdef"), 6)
1210 self.assertEqual(bufio.tell(), 6)
1211 bufio.seek(0, 0)
1212 self.assertEqual(bufio.write(b"XY"), 2)
1213 bufio.seek(6, 0)
1214 self.assertEqual(raw.getvalue(), b"XYcdef")
1215 self.assertEqual(bufio.write(b"123456"), 6)
1216 bufio.flush()
1217 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001218
Antoine Pitrou19690592009-06-12 20:14:08 +00001219 def test_flush(self):
1220 writer = self.MockRawIO()
1221 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001222 bufio.write(b"abc")
1223 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001224 self.assertEqual(b"abc", writer._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001225
Antoine Pitrou78e761e2012-10-16 22:57:11 +02001226 def test_writelines(self):
1227 l = [b'ab', b'cd', b'ef']
1228 writer = self.MockRawIO()
1229 bufio = self.tp(writer, 8)
1230 bufio.writelines(l)
1231 bufio.flush()
1232 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1233
1234 def test_writelines_userlist(self):
1235 l = UserList([b'ab', b'cd', b'ef'])
1236 writer = self.MockRawIO()
1237 bufio = self.tp(writer, 8)
1238 bufio.writelines(l)
1239 bufio.flush()
1240 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1241
1242 def test_writelines_error(self):
1243 writer = self.MockRawIO()
1244 bufio = self.tp(writer, 8)
1245 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1246 self.assertRaises(TypeError, bufio.writelines, None)
1247
Antoine Pitrou19690592009-06-12 20:14:08 +00001248 def test_destructor(self):
1249 writer = self.MockRawIO()
1250 bufio = self.tp(writer, 8)
1251 bufio.write(b"abc")
1252 del bufio
1253 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001254 self.assertEqual(b"abc", writer._write_stack[0])
Antoine Pitrou19690592009-06-12 20:14:08 +00001255
1256 def test_truncate(self):
1257 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001258 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001259 bufio = self.tp(raw, 8)
1260 bufio.write(b"abcdef")
1261 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001262 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001263 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001264 self.assertEqual(f.read(), b"abc")
1265
Victor Stinner6a102812010-04-27 23:55:59 +00001266 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001267 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001268 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001269 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001270 # Write out many bytes from many threads and test they were
1271 # all flushed.
1272 N = 1000
1273 contents = bytes(range(256)) * N
1274 sizes = cycle([1, 19])
1275 n = 0
1276 queue = deque()
1277 while n < len(contents):
1278 size = next(sizes)
1279 queue.append(contents[n:n+size])
1280 n += size
1281 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001282 # We use a real file object because it allows us to
1283 # exercise situations where the GIL is released before
1284 # writing the buffer to the raw streams. This is in addition
1285 # to concurrency issues due to switching threads in the middle
1286 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001287 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001288 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001289 errors = []
1290 def f():
1291 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001292 while True:
1293 try:
1294 s = queue.popleft()
1295 except IndexError:
1296 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001297 bufio.write(s)
1298 except Exception as e:
1299 errors.append(e)
1300 raise
1301 threads = [threading.Thread(target=f) for x in range(20)]
1302 for t in threads:
1303 t.start()
1304 time.sleep(0.02) # yield
1305 for t in threads:
1306 t.join()
1307 self.assertFalse(errors,
1308 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001309 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001310 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001311 s = f.read()
1312 for i in range(256):
Ezio Melotti2623a372010-11-21 13:34:58 +00001313 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001314 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001315 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001316
Antoine Pitrou19690592009-06-12 20:14:08 +00001317 def test_misbehaved_io(self):
1318 rawio = self.MisbehavedRawIO()
1319 bufio = self.tp(rawio, 5)
1320 self.assertRaises(IOError, bufio.seek, 0)
1321 self.assertRaises(IOError, bufio.tell)
1322 self.assertRaises(IOError, bufio.write, b"abcdef")
1323
1324 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001325 with support.check_warnings(("max_buffer_size is deprecated",
1326 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001327 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001328
Benjamin Petersona2d6d712012-12-20 12:24:10 -06001329 def test_write_error_on_close(self):
1330 raw = self.MockRawIO()
1331 def bad_write(b):
1332 raise IOError()
1333 raw.write = bad_write
1334 b = self.tp(raw)
1335 b.write(b'spam')
1336 self.assertRaises(IOError, b.close) # exception not swallowed
1337 self.assertTrue(b.closed)
1338
Antoine Pitrou19690592009-06-12 20:14:08 +00001339
Antoine Pitroubff5df02012-07-29 19:02:46 +02001340class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001341 tp = io.BufferedWriter
1342
1343 def test_constructor(self):
1344 BufferedWriterTest.test_constructor(self)
1345 # The allocation can succeed on 32-bit builds, e.g. with more
1346 # than 2GB RAM and a 64-bit kernel.
1347 if sys.maxsize > 0x7FFFFFFF:
1348 rawio = self.MockRawIO()
1349 bufio = self.tp(rawio)
1350 self.assertRaises((OverflowError, MemoryError, ValueError),
1351 bufio.__init__, rawio, sys.maxsize)
1352
1353 def test_initialization(self):
1354 rawio = self.MockRawIO()
1355 bufio = self.tp(rawio)
1356 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1357 self.assertRaises(ValueError, bufio.write, b"def")
1358 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1359 self.assertRaises(ValueError, bufio.write, b"def")
1360 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1361 self.assertRaises(ValueError, bufio.write, b"def")
1362
1363 def test_garbage_collection(self):
1364 # C BufferedWriter objects are collected, and collecting them flushes
1365 # all data to disk.
1366 # The Python version has __del__, so it ends into gc.garbage instead
1367 rawio = self.FileIO(support.TESTFN, "w+b")
1368 f = self.tp(rawio)
1369 f.write(b"123xxx")
1370 f.x = f
1371 wr = weakref.ref(f)
1372 del f
1373 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001374 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001375 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001376 self.assertEqual(f.read(), b"123xxx")
1377
R David Murray5b2cf5e2013-02-23 22:11:21 -05001378 def test_args_error(self):
1379 # Issue #17275
1380 with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
1381 self.tp(io.BytesIO(), 1024, 1024, 1024)
1382
Antoine Pitrou19690592009-06-12 20:14:08 +00001383
1384class PyBufferedWriterTest(BufferedWriterTest):
1385 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001386
1387class BufferedRWPairTest(unittest.TestCase):
1388
Antoine Pitrou19690592009-06-12 20:14:08 +00001389 def test_constructor(self):
1390 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001391 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001392
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001393 def test_uninitialized(self):
1394 pair = self.tp.__new__(self.tp)
1395 del pair
1396 pair = self.tp.__new__(self.tp)
1397 self.assertRaisesRegexp((ValueError, AttributeError),
1398 'uninitialized|has no attribute',
1399 pair.read, 0)
1400 self.assertRaisesRegexp((ValueError, AttributeError),
1401 'uninitialized|has no attribute',
1402 pair.write, b'')
1403 pair.__init__(self.MockRawIO(), self.MockRawIO())
1404 self.assertEqual(pair.read(0), b'')
1405 self.assertEqual(pair.write(b''), 0)
1406
Antoine Pitrou19690592009-06-12 20:14:08 +00001407 def test_detach(self):
1408 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1409 self.assertRaises(self.UnsupportedOperation, pair.detach)
1410
1411 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001412 with support.check_warnings(("max_buffer_size is deprecated",
1413 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001414 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001415
1416 def test_constructor_with_not_readable(self):
1417 class NotReadable(MockRawIO):
1418 def readable(self):
1419 return False
1420
1421 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1422
1423 def test_constructor_with_not_writeable(self):
1424 class NotWriteable(MockRawIO):
1425 def writable(self):
1426 return False
1427
1428 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1429
1430 def test_read(self):
1431 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1432
1433 self.assertEqual(pair.read(3), b"abc")
1434 self.assertEqual(pair.read(1), b"d")
1435 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001436 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1437 self.assertEqual(pair.read(None), b"abc")
1438
1439 def test_readlines(self):
1440 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1441 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1442 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1443 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001444
1445 def test_read1(self):
1446 # .read1() is delegated to the underlying reader object, so this test
1447 # can be shallow.
1448 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1449
1450 self.assertEqual(pair.read1(3), b"abc")
1451
1452 def test_readinto(self):
1453 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1454
1455 data = bytearray(5)
1456 self.assertEqual(pair.readinto(data), 5)
1457 self.assertEqual(data, b"abcde")
1458
1459 def test_write(self):
1460 w = self.MockRawIO()
1461 pair = self.tp(self.MockRawIO(), w)
1462
1463 pair.write(b"abc")
1464 pair.flush()
1465 pair.write(b"def")
1466 pair.flush()
1467 self.assertEqual(w._write_stack, [b"abc", b"def"])
1468
1469 def test_peek(self):
1470 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1471
1472 self.assertTrue(pair.peek(3).startswith(b"abc"))
1473 self.assertEqual(pair.read(3), b"abc")
1474
1475 def test_readable(self):
1476 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1477 self.assertTrue(pair.readable())
1478
1479 def test_writeable(self):
1480 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1481 self.assertTrue(pair.writable())
1482
1483 def test_seekable(self):
1484 # BufferedRWPairs are never seekable, even if their readers and writers
1485 # are.
1486 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1487 self.assertFalse(pair.seekable())
1488
1489 # .flush() is delegated to the underlying writer object and has been
1490 # tested in the test_write method.
1491
1492 def test_close_and_closed(self):
1493 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1494 self.assertFalse(pair.closed)
1495 pair.close()
1496 self.assertTrue(pair.closed)
1497
Serhiy Storchakaf95a57f2015-03-24 23:23:42 +02001498 def test_reader_close_error_on_close(self):
1499 def reader_close():
1500 reader_non_existing
1501 reader = self.MockRawIO()
1502 reader.close = reader_close
1503 writer = self.MockRawIO()
1504 pair = self.tp(reader, writer)
1505 with self.assertRaises(NameError) as err:
1506 pair.close()
1507 self.assertIn('reader_non_existing', str(err.exception))
1508 self.assertTrue(pair.closed)
1509 self.assertFalse(reader.closed)
1510 self.assertTrue(writer.closed)
1511
1512 def test_writer_close_error_on_close(self):
1513 def writer_close():
1514 writer_non_existing
1515 reader = self.MockRawIO()
1516 writer = self.MockRawIO()
1517 writer.close = writer_close
1518 pair = self.tp(reader, writer)
1519 with self.assertRaises(NameError) as err:
1520 pair.close()
1521 self.assertIn('writer_non_existing', str(err.exception))
1522 self.assertFalse(pair.closed)
1523 self.assertTrue(reader.closed)
1524 self.assertFalse(writer.closed)
1525
1526 def test_reader_writer_close_error_on_close(self):
1527 def reader_close():
1528 reader_non_existing
1529 def writer_close():
1530 writer_non_existing
1531 reader = self.MockRawIO()
1532 reader.close = reader_close
1533 writer = self.MockRawIO()
1534 writer.close = writer_close
1535 pair = self.tp(reader, writer)
1536 with self.assertRaises(NameError) as err:
1537 pair.close()
1538 self.assertIn('reader_non_existing', str(err.exception))
1539 self.assertFalse(pair.closed)
1540 self.assertFalse(reader.closed)
1541 self.assertFalse(writer.closed)
1542
Antoine Pitrou19690592009-06-12 20:14:08 +00001543 def test_isatty(self):
1544 class SelectableIsAtty(MockRawIO):
1545 def __init__(self, isatty):
1546 MockRawIO.__init__(self)
1547 self._isatty = isatty
1548
1549 def isatty(self):
1550 return self._isatty
1551
1552 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1553 self.assertFalse(pair.isatty())
1554
1555 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1556 self.assertTrue(pair.isatty())
1557
1558 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1559 self.assertTrue(pair.isatty())
1560
1561 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1562 self.assertTrue(pair.isatty())
1563
Benjamin Peterson1c873bf2014-09-29 22:46:57 -04001564 def test_weakref_clearing(self):
1565 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1566 ref = weakref.ref(brw)
1567 brw = None
1568 ref = None # Shouldn't segfault.
1569
Antoine Pitrou19690592009-06-12 20:14:08 +00001570class CBufferedRWPairTest(BufferedRWPairTest):
1571 tp = io.BufferedRWPair
1572
1573class PyBufferedRWPairTest(BufferedRWPairTest):
1574 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001575
1576
Antoine Pitrou19690592009-06-12 20:14:08 +00001577class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1578 read_mode = "rb+"
1579 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001580
Antoine Pitrou19690592009-06-12 20:14:08 +00001581 def test_constructor(self):
1582 BufferedReaderTest.test_constructor(self)
1583 BufferedWriterTest.test_constructor(self)
1584
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001585 def test_uninitialized(self):
1586 BufferedReaderTest.test_uninitialized(self)
1587 BufferedWriterTest.test_uninitialized(self)
1588
Antoine Pitrou19690592009-06-12 20:14:08 +00001589 def test_read_and_write(self):
1590 raw = self.MockRawIO((b"asdf", b"ghjk"))
1591 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001592
1593 self.assertEqual(b"as", rw.read(2))
1594 rw.write(b"ddd")
1595 rw.write(b"eee")
1596 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001597 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001598 self.assertEqual(b"dddeee", raw._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001599
Antoine Pitrou19690592009-06-12 20:14:08 +00001600 def test_seek_and_tell(self):
1601 raw = self.BytesIO(b"asdfghjkl")
1602 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001603
Ezio Melotti2623a372010-11-21 13:34:58 +00001604 self.assertEqual(b"as", rw.read(2))
1605 self.assertEqual(2, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001606 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001607 self.assertEqual(b"asdf", rw.read(4))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001608
Antoine Pitrou808cec52011-08-20 15:40:58 +02001609 rw.write(b"123f")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001610 rw.seek(0, 0)
Antoine Pitrou808cec52011-08-20 15:40:58 +02001611 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001612 self.assertEqual(9, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001613 rw.seek(-4, 2)
Ezio Melotti2623a372010-11-21 13:34:58 +00001614 self.assertEqual(5, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001615 rw.seek(2, 1)
Ezio Melotti2623a372010-11-21 13:34:58 +00001616 self.assertEqual(7, rw.tell())
1617 self.assertEqual(b"fl", rw.read(11))
Antoine Pitrou808cec52011-08-20 15:40:58 +02001618 rw.flush()
1619 self.assertEqual(b"asdf123fl", raw.getvalue())
1620
Christian Heimes1a6387e2008-03-26 12:49:49 +00001621 self.assertRaises(TypeError, rw.seek, 0.0)
1622
Antoine Pitrou19690592009-06-12 20:14:08 +00001623 def check_flush_and_read(self, read_func):
1624 raw = self.BytesIO(b"abcdefghi")
1625 bufio = self.tp(raw)
1626
Ezio Melotti2623a372010-11-21 13:34:58 +00001627 self.assertEqual(b"ab", read_func(bufio, 2))
Antoine Pitrou19690592009-06-12 20:14:08 +00001628 bufio.write(b"12")
Ezio Melotti2623a372010-11-21 13:34:58 +00001629 self.assertEqual(b"ef", read_func(bufio, 2))
1630 self.assertEqual(6, bufio.tell())
Antoine Pitrou19690592009-06-12 20:14:08 +00001631 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001632 self.assertEqual(6, bufio.tell())
1633 self.assertEqual(b"ghi", read_func(bufio))
Antoine Pitrou19690592009-06-12 20:14:08 +00001634 raw.seek(0, 0)
1635 raw.write(b"XYZ")
1636 # flush() resets the read buffer
1637 bufio.flush()
1638 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001639 self.assertEqual(b"XYZ", read_func(bufio, 3))
Antoine Pitrou19690592009-06-12 20:14:08 +00001640
1641 def test_flush_and_read(self):
1642 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1643
1644 def test_flush_and_readinto(self):
1645 def _readinto(bufio, n=-1):
1646 b = bytearray(n if n >= 0 else 9999)
1647 n = bufio.readinto(b)
1648 return bytes(b[:n])
1649 self.check_flush_and_read(_readinto)
1650
1651 def test_flush_and_peek(self):
1652 def _peek(bufio, n=-1):
1653 # This relies on the fact that the buffer can contain the whole
1654 # raw stream, otherwise peek() can return less.
1655 b = bufio.peek(n)
1656 if n != -1:
1657 b = b[:n]
1658 bufio.seek(len(b), 1)
1659 return b
1660 self.check_flush_and_read(_peek)
1661
1662 def test_flush_and_write(self):
1663 raw = self.BytesIO(b"abcdefghi")
1664 bufio = self.tp(raw)
1665
1666 bufio.write(b"123")
1667 bufio.flush()
1668 bufio.write(b"45")
1669 bufio.flush()
1670 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001671 self.assertEqual(b"12345fghi", raw.getvalue())
1672 self.assertEqual(b"12345fghi", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +00001673
1674 def test_threads(self):
1675 BufferedReaderTest.test_threads(self)
1676 BufferedWriterTest.test_threads(self)
1677
1678 def test_writes_and_peek(self):
1679 def _peek(bufio):
1680 bufio.peek(1)
1681 self.check_writes(_peek)
1682 def _peek(bufio):
1683 pos = bufio.tell()
1684 bufio.seek(-1, 1)
1685 bufio.peek(1)
1686 bufio.seek(pos, 0)
1687 self.check_writes(_peek)
1688
1689 def test_writes_and_reads(self):
1690 def _read(bufio):
1691 bufio.seek(-1, 1)
1692 bufio.read(1)
1693 self.check_writes(_read)
1694
1695 def test_writes_and_read1s(self):
1696 def _read1(bufio):
1697 bufio.seek(-1, 1)
1698 bufio.read1(1)
1699 self.check_writes(_read1)
1700
1701 def test_writes_and_readintos(self):
1702 def _read(bufio):
1703 bufio.seek(-1, 1)
1704 bufio.readinto(bytearray(1))
1705 self.check_writes(_read)
1706
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001707 def test_write_after_readahead(self):
1708 # Issue #6629: writing after the buffer was filled by readahead should
1709 # first rewind the raw stream.
1710 for overwrite_size in [1, 5]:
1711 raw = self.BytesIO(b"A" * 10)
1712 bufio = self.tp(raw, 4)
1713 # Trigger readahead
1714 self.assertEqual(bufio.read(1), b"A")
1715 self.assertEqual(bufio.tell(), 1)
1716 # Overwriting should rewind the raw stream if it needs so
1717 bufio.write(b"B" * overwrite_size)
1718 self.assertEqual(bufio.tell(), overwrite_size + 1)
1719 # If the write size was smaller than the buffer size, flush() and
1720 # check that rewind happens.
1721 bufio.flush()
1722 self.assertEqual(bufio.tell(), overwrite_size + 1)
1723 s = raw.getvalue()
1724 self.assertEqual(s,
1725 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1726
Antoine Pitrouee46a7b2011-05-13 00:31:52 +02001727 def test_write_rewind_write(self):
1728 # Various combinations of reading / writing / seeking backwards / writing again
1729 def mutate(bufio, pos1, pos2):
1730 assert pos2 >= pos1
1731 # Fill the buffer
1732 bufio.seek(pos1)
1733 bufio.read(pos2 - pos1)
1734 bufio.write(b'\x02')
1735 # This writes earlier than the previous write, but still inside
1736 # the buffer.
1737 bufio.seek(pos1)
1738 bufio.write(b'\x01')
1739
1740 b = b"\x80\x81\x82\x83\x84"
1741 for i in range(0, len(b)):
1742 for j in range(i, len(b)):
1743 raw = self.BytesIO(b)
1744 bufio = self.tp(raw, 100)
1745 mutate(bufio, i, j)
1746 bufio.flush()
1747 expected = bytearray(b)
1748 expected[j] = 2
1749 expected[i] = 1
1750 self.assertEqual(raw.getvalue(), expected,
1751 "failed result for i=%d, j=%d" % (i, j))
1752
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001753 def test_truncate_after_read_or_write(self):
1754 raw = self.BytesIO(b"A" * 10)
1755 bufio = self.tp(raw, 100)
1756 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1757 self.assertEqual(bufio.truncate(), 2)
1758 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1759 self.assertEqual(bufio.truncate(), 4)
1760
Antoine Pitrou19690592009-06-12 20:14:08 +00001761 def test_misbehaved_io(self):
1762 BufferedReaderTest.test_misbehaved_io(self)
1763 BufferedWriterTest.test_misbehaved_io(self)
1764
Antoine Pitrou808cec52011-08-20 15:40:58 +02001765 def test_interleaved_read_write(self):
1766 # Test for issue #12213
1767 with self.BytesIO(b'abcdefgh') as raw:
1768 with self.tp(raw, 100) as f:
1769 f.write(b"1")
1770 self.assertEqual(f.read(1), b'b')
1771 f.write(b'2')
1772 self.assertEqual(f.read1(1), b'd')
1773 f.write(b'3')
1774 buf = bytearray(1)
1775 f.readinto(buf)
1776 self.assertEqual(buf, b'f')
1777 f.write(b'4')
1778 self.assertEqual(f.peek(1), b'h')
1779 f.flush()
1780 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1781
1782 with self.BytesIO(b'abc') as raw:
1783 with self.tp(raw, 100) as f:
1784 self.assertEqual(f.read(1), b'a')
1785 f.write(b"2")
1786 self.assertEqual(f.read(1), b'c')
1787 f.flush()
1788 self.assertEqual(raw.getvalue(), b'a2c')
1789
1790 def test_interleaved_readline_write(self):
1791 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1792 with self.tp(raw) as f:
1793 f.write(b'1')
1794 self.assertEqual(f.readline(), b'b\n')
1795 f.write(b'2')
1796 self.assertEqual(f.readline(), b'def\n')
1797 f.write(b'3')
1798 self.assertEqual(f.readline(), b'\n')
1799 f.flush()
1800 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1801
R David Murray5b2cf5e2013-02-23 22:11:21 -05001802
Antoine Pitroubff5df02012-07-29 19:02:46 +02001803class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
1804 BufferedRandomTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001805 tp = io.BufferedRandom
1806
1807 def test_constructor(self):
1808 BufferedRandomTest.test_constructor(self)
1809 # The allocation can succeed on 32-bit builds, e.g. with more
1810 # than 2GB RAM and a 64-bit kernel.
1811 if sys.maxsize > 0x7FFFFFFF:
1812 rawio = self.MockRawIO()
1813 bufio = self.tp(rawio)
1814 self.assertRaises((OverflowError, MemoryError, ValueError),
1815 bufio.__init__, rawio, sys.maxsize)
1816
1817 def test_garbage_collection(self):
1818 CBufferedReaderTest.test_garbage_collection(self)
1819 CBufferedWriterTest.test_garbage_collection(self)
1820
R David Murray5b2cf5e2013-02-23 22:11:21 -05001821 def test_args_error(self):
1822 # Issue #17275
1823 with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
1824 self.tp(io.BytesIO(), 1024, 1024, 1024)
1825
1826
Antoine Pitrou19690592009-06-12 20:14:08 +00001827class PyBufferedRandomTest(BufferedRandomTest):
1828 tp = pyio.BufferedRandom
1829
1830
Christian Heimes1a6387e2008-03-26 12:49:49 +00001831# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1832# properties:
1833# - A single output character can correspond to many bytes of input.
1834# - The number of input bytes to complete the character can be
1835# undetermined until the last input byte is received.
1836# - The number of input bytes can vary depending on previous input.
1837# - A single input byte can correspond to many characters of output.
1838# - The number of output characters can be undetermined until the
1839# last input byte is received.
1840# - The number of output characters can vary depending on previous input.
1841
1842class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1843 """
1844 For testing seek/tell behavior with a stateful, buffering decoder.
1845
1846 Input is a sequence of words. Words may be fixed-length (length set
1847 by input) or variable-length (period-terminated). In variable-length
1848 mode, extra periods are ignored. Possible words are:
1849 - 'i' followed by a number sets the input length, I (maximum 99).
1850 When I is set to 0, words are space-terminated.
1851 - 'o' followed by a number sets the output length, O (maximum 99).
1852 - Any other word is converted into a word followed by a period on
1853 the output. The output word consists of the input word truncated
1854 or padded out with hyphens to make its length equal to O. If O
1855 is 0, the word is output verbatim without truncating or padding.
1856 I and O are initially set to 1. When I changes, any buffered input is
1857 re-scanned according to the new I. EOF also terminates the last word.
1858 """
1859
1860 def __init__(self, errors='strict'):
1861 codecs.IncrementalDecoder.__init__(self, errors)
1862 self.reset()
1863
1864 def __repr__(self):
1865 return '<SID %x>' % id(self)
1866
1867 def reset(self):
1868 self.i = 1
1869 self.o = 1
1870 self.buffer = bytearray()
1871
1872 def getstate(self):
1873 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1874 return bytes(self.buffer), i*100 + o
1875
1876 def setstate(self, state):
1877 buffer, io = state
1878 self.buffer = bytearray(buffer)
1879 i, o = divmod(io, 100)
1880 self.i, self.o = i ^ 1, o ^ 1
1881
1882 def decode(self, input, final=False):
1883 output = ''
1884 for b in input:
1885 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001886 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001887 if self.buffer:
1888 output += self.process_word()
1889 else:
1890 self.buffer.append(b)
1891 else: # fixed-length, terminate after self.i bytes
1892 self.buffer.append(b)
1893 if len(self.buffer) == self.i:
1894 output += self.process_word()
1895 if final and self.buffer: # EOF terminates the last word
1896 output += self.process_word()
1897 return output
1898
1899 def process_word(self):
1900 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001901 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001902 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001903 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001904 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1905 else:
1906 output = self.buffer.decode('ascii')
1907 if len(output) < self.o:
1908 output += '-'*self.o # pad out with hyphens
1909 if self.o:
1910 output = output[:self.o] # truncate to output length
1911 output += '.'
1912 self.buffer = bytearray()
1913 return output
1914
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001915 codecEnabled = False
1916
1917 @classmethod
1918 def lookupTestDecoder(cls, name):
1919 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001920 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001921 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001922 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001923 incrementalencoder=None,
1924 streamreader=None, streamwriter=None,
1925 incrementaldecoder=cls)
1926
1927# Register the previous decoder for testing.
1928# Disabled by default, tests will enable it.
1929codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1930
1931
Christian Heimes1a6387e2008-03-26 12:49:49 +00001932class StatefulIncrementalDecoderTest(unittest.TestCase):
1933 """
1934 Make sure the StatefulIncrementalDecoder actually works.
1935 """
1936
1937 test_cases = [
1938 # I=1, O=1 (fixed-length input == fixed-length output)
1939 (b'abcd', False, 'a.b.c.d.'),
1940 # I=0, O=0 (variable-length input, variable-length output)
1941 (b'oiabcd', True, 'abcd.'),
1942 # I=0, O=0 (should ignore extra periods)
1943 (b'oi...abcd...', True, 'abcd.'),
1944 # I=0, O=6 (variable-length input, fixed-length output)
1945 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1946 # I=2, O=6 (fixed-length input < fixed-length output)
1947 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1948 # I=6, O=3 (fixed-length input > fixed-length output)
1949 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1950 # I=0, then 3; O=29, then 15 (with longer output)
1951 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1952 'a----------------------------.' +
1953 'b----------------------------.' +
1954 'cde--------------------------.' +
1955 'abcdefghijabcde.' +
1956 'a.b------------.' +
1957 '.c.------------.' +
1958 'd.e------------.' +
1959 'k--------------.' +
1960 'l--------------.' +
1961 'm--------------.')
1962 ]
1963
Antoine Pitrou19690592009-06-12 20:14:08 +00001964 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001965 # Try a few one-shot test cases.
1966 for input, eof, output in self.test_cases:
1967 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001968 self.assertEqual(d.decode(input, eof), output)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001969
1970 # Also test an unfinished decode, followed by forcing EOF.
1971 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001972 self.assertEqual(d.decode(b'oiabcd'), '')
1973 self.assertEqual(d.decode(b'', 1), 'abcd.')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001974
1975class TextIOWrapperTest(unittest.TestCase):
1976
1977 def setUp(self):
1978 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1979 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001980 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001981
1982 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001983 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001984
Antoine Pitrou19690592009-06-12 20:14:08 +00001985 def test_constructor(self):
1986 r = self.BytesIO(b"\xc3\xa9\n\n")
1987 b = self.BufferedReader(r, 1000)
1988 t = self.TextIOWrapper(b)
1989 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti2623a372010-11-21 13:34:58 +00001990 self.assertEqual(t.encoding, "latin1")
1991 self.assertEqual(t.line_buffering, False)
Antoine Pitrou19690592009-06-12 20:14:08 +00001992 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti2623a372010-11-21 13:34:58 +00001993 self.assertEqual(t.encoding, "utf8")
1994 self.assertEqual(t.line_buffering, True)
1995 self.assertEqual("\xe9\n", t.readline())
Antoine Pitrou19690592009-06-12 20:14:08 +00001996 self.assertRaises(TypeError, t.__init__, b, newline=42)
1997 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1998
1999 def test_detach(self):
2000 r = self.BytesIO()
2001 b = self.BufferedWriter(r)
2002 t = self.TextIOWrapper(b)
2003 self.assertIs(t.detach(), b)
2004
2005 t = self.TextIOWrapper(b, encoding="ascii")
2006 t.write("howdy")
2007 self.assertFalse(r.getvalue())
2008 t.detach()
2009 self.assertEqual(r.getvalue(), b"howdy")
2010 self.assertRaises(ValueError, t.detach)
2011
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002012 # Operations independent of the detached stream should still work
2013 repr(t)
2014 self.assertEqual(t.encoding, "ascii")
2015 self.assertEqual(t.errors, "strict")
2016 self.assertFalse(t.line_buffering)
2017
Antoine Pitrou19690592009-06-12 20:14:08 +00002018 def test_repr(self):
2019 raw = self.BytesIO("hello".encode("utf-8"))
2020 b = self.BufferedReader(raw)
2021 t = self.TextIOWrapper(b, encoding="utf-8")
2022 modname = self.TextIOWrapper.__module__
2023 self.assertEqual(repr(t),
2024 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2025 raw.name = "dummy"
2026 self.assertEqual(repr(t),
2027 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
2028 raw.name = b"dummy"
2029 self.assertEqual(repr(t),
2030 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
2031
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002032 t.buffer.detach()
2033 repr(t) # Should not raise an exception
2034
Antoine Pitrou19690592009-06-12 20:14:08 +00002035 def test_line_buffering(self):
2036 r = self.BytesIO()
2037 b = self.BufferedWriter(r, 1000)
2038 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
2039 t.write("X")
Ezio Melotti2623a372010-11-21 13:34:58 +00002040 self.assertEqual(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00002041 t.write("Y\nZ")
Ezio Melotti2623a372010-11-21 13:34:58 +00002042 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00002043 t.write("A\rB")
Ezio Melotti2623a372010-11-21 13:34:58 +00002044 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002045
Antoine Pitrou19690592009-06-12 20:14:08 +00002046 def test_encoding(self):
2047 # Check the encoding attribute is always set, and valid
2048 b = self.BytesIO()
2049 t = self.TextIOWrapper(b, encoding="utf8")
2050 self.assertEqual(t.encoding, "utf8")
2051 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002052 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002053 codecs.lookup(t.encoding)
2054
2055 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002056 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00002057 b = self.BytesIO(b"abc\n\xff\n")
2058 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002059 self.assertRaises(UnicodeError, t.read)
2060 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00002061 b = self.BytesIO(b"abc\n\xff\n")
2062 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002063 self.assertRaises(UnicodeError, t.read)
2064 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00002065 b = self.BytesIO(b"abc\n\xff\n")
2066 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti2623a372010-11-21 13:34:58 +00002067 self.assertEqual(t.read(), "abc\n\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002068 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00002069 b = self.BytesIO(b"abc\n\xff\n")
2070 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti2623a372010-11-21 13:34:58 +00002071 self.assertEqual(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002072
Antoine Pitrou19690592009-06-12 20:14:08 +00002073 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002074 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00002075 b = self.BytesIO()
2076 t = self.TextIOWrapper(b, encoding="ascii")
2077 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002078 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00002079 b = self.BytesIO()
2080 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2081 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002082 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00002083 b = self.BytesIO()
2084 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00002085 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00002086 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002087 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002088 self.assertEqual(b.getvalue(), b"abcdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002089 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00002090 b = self.BytesIO()
2091 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00002092 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00002093 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002094 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002095 self.assertEqual(b.getvalue(), b"abc?def\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002096
Antoine Pitrou19690592009-06-12 20:14:08 +00002097 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002098 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2099
2100 tests = [
2101 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
2102 [ '', input_lines ],
2103 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2104 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2105 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
2106 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002107 encodings = (
2108 'utf-8', 'latin-1',
2109 'utf-16', 'utf-16-le', 'utf-16-be',
2110 'utf-32', 'utf-32-le', 'utf-32-be',
2111 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00002112
2113 # Try a range of buffer sizes to test the case where \r is the last
2114 # character in TextIOWrapper._pending_line.
2115 for encoding in encodings:
2116 # XXX: str.encode() should return bytes
2117 data = bytes(''.join(input_lines).encode(encoding))
2118 for do_reads in (False, True):
2119 for bufsize in range(1, 10):
2120 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002121 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2122 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00002123 encoding=encoding)
2124 if do_reads:
2125 got_lines = []
2126 while True:
2127 c2 = textio.read(2)
2128 if c2 == '':
2129 break
Ezio Melotti2623a372010-11-21 13:34:58 +00002130 self.assertEqual(len(c2), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002131 got_lines.append(c2 + textio.readline())
2132 else:
2133 got_lines = list(textio)
2134
2135 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti2623a372010-11-21 13:34:58 +00002136 self.assertEqual(got_line, exp_line)
2137 self.assertEqual(len(got_lines), len(exp_lines))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002138
Antoine Pitrou19690592009-06-12 20:14:08 +00002139 def test_newlines_input(self):
2140 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002141 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2142 for newline, expected in [
2143 (None, normalized.decode("ascii").splitlines(True)),
2144 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00002145 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2146 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2147 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00002148 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00002149 buf = self.BytesIO(testdata)
2150 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti2623a372010-11-21 13:34:58 +00002151 self.assertEqual(txt.readlines(), expected)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002152 txt.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002153 self.assertEqual(txt.read(), "".join(expected))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002154
Antoine Pitrou19690592009-06-12 20:14:08 +00002155 def test_newlines_output(self):
2156 testdict = {
2157 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2158 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2159 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2160 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2161 }
2162 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2163 for newline, expected in tests:
2164 buf = self.BytesIO()
2165 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2166 txt.write("AAA\nB")
2167 txt.write("BB\nCCC\n")
2168 txt.write("X\rY\r\nZ")
2169 txt.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002170 self.assertEqual(buf.closed, False)
2171 self.assertEqual(buf.getvalue(), expected)
Antoine Pitrou19690592009-06-12 20:14:08 +00002172
2173 def test_destructor(self):
2174 l = []
2175 base = self.BytesIO
2176 class MyBytesIO(base):
2177 def close(self):
2178 l.append(self.getvalue())
2179 base.close(self)
2180 b = MyBytesIO()
2181 t = self.TextIOWrapper(b, encoding="ascii")
2182 t.write("abc")
2183 del t
2184 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00002185 self.assertEqual([b"abc"], l)
Antoine Pitrou19690592009-06-12 20:14:08 +00002186
2187 def test_override_destructor(self):
2188 record = []
2189 class MyTextIO(self.TextIOWrapper):
2190 def __del__(self):
2191 record.append(1)
2192 try:
2193 f = super(MyTextIO, self).__del__
2194 except AttributeError:
2195 pass
2196 else:
2197 f()
2198 def close(self):
2199 record.append(2)
2200 super(MyTextIO, self).close()
2201 def flush(self):
2202 record.append(3)
2203 super(MyTextIO, self).flush()
2204 b = self.BytesIO()
2205 t = MyTextIO(b, encoding="ascii")
2206 del t
2207 support.gc_collect()
2208 self.assertEqual(record, [1, 2, 3])
2209
2210 def test_error_through_destructor(self):
2211 # Test that the exception state is not modified by a destructor,
2212 # even if close() fails.
2213 rawio = self.CloseFailureIO()
2214 def f():
2215 self.TextIOWrapper(rawio).xyzzy
2216 with support.captured_output("stderr") as s:
2217 self.assertRaises(AttributeError, f)
2218 s = s.getvalue().strip()
2219 if s:
2220 # The destructor *may* have printed an unraisable error, check it
2221 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002222 self.assertTrue(s.startswith("Exception IOError: "), s)
2223 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002224
2225 # Systematic tests of the text I/O API
2226
Antoine Pitrou19690592009-06-12 20:14:08 +00002227 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002228 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2229 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00002230 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002231 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002232 self.assertEqual(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002233 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002234 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002235 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002236 self.assertEqual(f.tell(), 0)
2237 self.assertEqual(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002238 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002239 self.assertEqual(f.seek(0), 0)
2240 self.assertEqual(f.read(None), "abc")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002241 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002242 self.assertEqual(f.read(2), "ab")
2243 self.assertEqual(f.read(1), "c")
2244 self.assertEqual(f.read(1), "")
2245 self.assertEqual(f.read(), "")
2246 self.assertEqual(f.tell(), cookie)
2247 self.assertEqual(f.seek(0), 0)
2248 self.assertEqual(f.seek(0, 2), cookie)
2249 self.assertEqual(f.write("def"), 3)
2250 self.assertEqual(f.seek(cookie), cookie)
2251 self.assertEqual(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002252 if enc.startswith("utf"):
2253 self.multi_line_test(f, enc)
2254 f.close()
2255
2256 def multi_line_test(self, f, enc):
2257 f.seek(0)
2258 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00002259 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002260 wlines = []
2261 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2262 chars = []
2263 for i in range(size):
2264 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00002265 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002266 wlines.append((f.tell(), line))
2267 f.write(line)
2268 f.seek(0)
2269 rlines = []
2270 while True:
2271 pos = f.tell()
2272 line = f.readline()
2273 if not line:
2274 break
2275 rlines.append((pos, line))
Ezio Melotti2623a372010-11-21 13:34:58 +00002276 self.assertEqual(rlines, wlines)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002277
Antoine Pitrou19690592009-06-12 20:14:08 +00002278 def test_telling(self):
2279 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002280 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002281 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002282 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002283 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002284 p2 = f.tell()
2285 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002286 self.assertEqual(f.tell(), p0)
2287 self.assertEqual(f.readline(), "\xff\n")
2288 self.assertEqual(f.tell(), p1)
2289 self.assertEqual(f.readline(), "\xff\n")
2290 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002291 f.seek(0)
2292 for line in f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002293 self.assertEqual(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002294 self.assertRaises(IOError, f.tell)
Ezio Melotti2623a372010-11-21 13:34:58 +00002295 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002296 f.close()
2297
Antoine Pitrou19690592009-06-12 20:14:08 +00002298 def test_seeking(self):
2299 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00002300 prefix_size = chunk_size - 2
2301 u_prefix = "a" * prefix_size
2302 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti2623a372010-11-21 13:34:58 +00002303 self.assertEqual(len(u_prefix), len(prefix))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002304 u_suffix = "\u8888\n"
2305 suffix = bytes(u_suffix.encode("utf-8"))
2306 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00002307 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002308 f.write(line*2)
2309 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002310 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002311 s = f.read(prefix_size)
Ezio Melotti2623a372010-11-21 13:34:58 +00002312 self.assertEqual(s, prefix.decode("ascii"))
2313 self.assertEqual(f.tell(), prefix_size)
2314 self.assertEqual(f.readline(), u_suffix)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002315
Antoine Pitrou19690592009-06-12 20:14:08 +00002316 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002317 # Regression test for a specific bug
2318 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00002319 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002320 f.write(data)
2321 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002322 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002323 f._CHUNK_SIZE # Just test that it exists
2324 f._CHUNK_SIZE = 2
2325 f.readline()
2326 f.tell()
2327
Antoine Pitrou19690592009-06-12 20:14:08 +00002328 def test_seek_and_tell(self):
2329 #Test seek/tell using the StatefulIncrementalDecoder.
2330 # Make test faster by doing smaller seeks
2331 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00002332
Antoine Pitrou19690592009-06-12 20:14:08 +00002333 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002334 """Tell/seek to various points within a data stream and ensure
2335 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00002336 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002337 f.write(data)
2338 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002339 f = self.open(support.TESTFN, encoding='test_decoder')
2340 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00002341 decoded = f.read()
2342 f.close()
2343
2344 for i in range(min_pos, len(decoded) + 1): # seek positions
2345 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00002346 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti2623a372010-11-21 13:34:58 +00002347 self.assertEqual(f.read(i), decoded[:i])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002348 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002349 self.assertEqual(f.read(j), decoded[i:i + j])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002350 f.seek(cookie)
Ezio Melotti2623a372010-11-21 13:34:58 +00002351 self.assertEqual(f.read(), decoded[i:])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002352 f.close()
2353
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002354 # Enable the test decoder.
2355 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002356
2357 # Run the tests.
2358 try:
2359 # Try each test case.
2360 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002361 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002362
2363 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002364 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2365 offset = CHUNK_SIZE - len(input)//2
2366 prefix = b'.'*offset
2367 # Don't bother seeking into the prefix (takes too long).
2368 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002369 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002370
2371 # Ensure our test decoder won't interfere with subsequent tests.
2372 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002373 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002374
Antoine Pitrou19690592009-06-12 20:14:08 +00002375 def test_encoded_writes(self):
2376 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002377 tests = ("utf-16",
2378 "utf-16-le",
2379 "utf-16-be",
2380 "utf-32",
2381 "utf-32-le",
2382 "utf-32-be")
2383 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002384 buf = self.BytesIO()
2385 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002386 # Check if the BOM is written only once (see issue1753).
2387 f.write(data)
2388 f.write(data)
2389 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002390 self.assertEqual(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002391 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002392 self.assertEqual(f.read(), data * 2)
2393 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002394
Antoine Pitrou19690592009-06-12 20:14:08 +00002395 def test_unreadable(self):
2396 class UnReadable(self.BytesIO):
2397 def readable(self):
2398 return False
2399 txt = self.TextIOWrapper(UnReadable())
2400 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002401
Antoine Pitrou19690592009-06-12 20:14:08 +00002402 def test_read_one_by_one(self):
2403 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002404 reads = ""
2405 while True:
2406 c = txt.read(1)
2407 if not c:
2408 break
2409 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002410 self.assertEqual(reads, "AA\nBB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002411
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002412 def test_readlines(self):
2413 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2414 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2415 txt.seek(0)
2416 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2417 txt.seek(0)
2418 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2419
Christian Heimes1a6387e2008-03-26 12:49:49 +00002420 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002421 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002422 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002423 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002424 reads = ""
2425 while True:
2426 c = txt.read(128)
2427 if not c:
2428 break
2429 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002430 self.assertEqual(reads, "A"*127+"\nB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002431
Antoine Pitroueadca1d2012-10-16 23:02:27 +02002432 def test_writelines(self):
2433 l = ['ab', 'cd', 'ef']
2434 buf = self.BytesIO()
2435 txt = self.TextIOWrapper(buf)
2436 txt.writelines(l)
2437 txt.flush()
2438 self.assertEqual(buf.getvalue(), b'abcdef')
2439
2440 def test_writelines_userlist(self):
2441 l = UserList(['ab', 'cd', 'ef'])
2442 buf = self.BytesIO()
2443 txt = self.TextIOWrapper(buf)
2444 txt.writelines(l)
2445 txt.flush()
2446 self.assertEqual(buf.getvalue(), b'abcdef')
2447
2448 def test_writelines_error(self):
2449 txt = self.TextIOWrapper(self.BytesIO())
2450 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2451 self.assertRaises(TypeError, txt.writelines, None)
2452 self.assertRaises(TypeError, txt.writelines, b'abc')
2453
Christian Heimes1a6387e2008-03-26 12:49:49 +00002454 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002455 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002456
2457 # read one char at a time
2458 reads = ""
2459 while True:
2460 c = txt.read(1)
2461 if not c:
2462 break
2463 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002464 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002465
2466 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002467 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002468 txt._CHUNK_SIZE = 4
2469
2470 reads = ""
2471 while True:
2472 c = txt.read(4)
2473 if not c:
2474 break
2475 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002476 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002477
2478 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002479 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002480 txt._CHUNK_SIZE = 4
2481
2482 reads = txt.read(4)
2483 reads += txt.read(4)
2484 reads += txt.readline()
2485 reads += txt.readline()
2486 reads += txt.readline()
Ezio Melotti2623a372010-11-21 13:34:58 +00002487 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002488
2489 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002490 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002491 txt._CHUNK_SIZE = 4
2492
2493 reads = txt.read(4)
2494 reads += txt.read()
Ezio Melotti2623a372010-11-21 13:34:58 +00002495 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002496
2497 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002498 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002499 txt._CHUNK_SIZE = 4
2500
2501 reads = txt.read(4)
2502 pos = txt.tell()
2503 txt.seek(0)
2504 txt.seek(pos)
Ezio Melotti2623a372010-11-21 13:34:58 +00002505 self.assertEqual(txt.read(4), "BBB\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002506
2507 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002508 buffer = self.BytesIO(self.testdata)
2509 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002510
2511 self.assertEqual(buffer.seekable(), txt.seekable())
2512
Antoine Pitrou19690592009-06-12 20:14:08 +00002513 def test_append_bom(self):
2514 # The BOM is not written again when appending to a non-empty file
2515 filename = support.TESTFN
2516 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2517 with self.open(filename, 'w', encoding=charset) as f:
2518 f.write('aaa')
2519 pos = f.tell()
2520 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002521 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002522
2523 with self.open(filename, 'a', encoding=charset) as f:
2524 f.write('xxx')
2525 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002526 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002527
Antoine Pitrou19690592009-06-12 20:14:08 +00002528 def test_seek_bom(self):
2529 # Same test, but when seeking manually
2530 filename = support.TESTFN
2531 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2532 with self.open(filename, 'w', encoding=charset) as f:
2533 f.write('aaa')
2534 pos = f.tell()
2535 with self.open(filename, 'r+', encoding=charset) as f:
2536 f.seek(pos)
2537 f.write('zzz')
2538 f.seek(0)
2539 f.write('bbb')
2540 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002541 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002542
2543 def test_errors_property(self):
2544 with self.open(support.TESTFN, "w") as f:
2545 self.assertEqual(f.errors, "strict")
2546 with self.open(support.TESTFN, "w", errors="replace") as f:
2547 self.assertEqual(f.errors, "replace")
2548
Victor Stinner6a102812010-04-27 23:55:59 +00002549 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002550 def test_threads_write(self):
2551 # Issue6750: concurrent writes could duplicate data
2552 event = threading.Event()
2553 with self.open(support.TESTFN, "w", buffering=1) as f:
2554 def run(n):
2555 text = "Thread%03d\n" % n
2556 event.wait()
2557 f.write(text)
2558 threads = [threading.Thread(target=lambda n=x: run(n))
2559 for x in range(20)]
2560 for t in threads:
2561 t.start()
2562 time.sleep(0.02)
2563 event.set()
2564 for t in threads:
2565 t.join()
2566 with self.open(support.TESTFN) as f:
2567 content = f.read()
2568 for n in range(20):
Ezio Melotti2623a372010-11-21 13:34:58 +00002569 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002570
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002571 def test_flush_error_on_close(self):
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002572 # Test that text file is closed despite failed flush
2573 # and that flush() is called before file closed.
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002574 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002575 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002576 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002577 closed[:] = [txt.closed, txt.buffer.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002578 raise IOError()
2579 txt.flush = bad_flush
2580 self.assertRaises(IOError, txt.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -06002581 self.assertTrue(txt.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002582 self.assertTrue(txt.buffer.closed)
2583 self.assertTrue(closed) # flush() called
2584 self.assertFalse(closed[0]) # flush() called before file closed
2585 self.assertFalse(closed[1])
Serhiy Storchaka437d5352015-02-23 00:28:38 +02002586 txt.flush = lambda: None # break reference loop
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002587
2588 def test_multi_close(self):
2589 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2590 txt.close()
2591 txt.close()
2592 txt.close()
2593 self.assertRaises(ValueError, txt.flush)
2594
Antoine Pitroufc9ead62010-12-21 21:26:55 +00002595 def test_readonly_attributes(self):
2596 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2597 buf = self.BytesIO(self.testdata)
2598 with self.assertRaises((AttributeError, TypeError)):
2599 txt.buffer = buf
2600
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002601 def test_read_nonbytes(self):
2602 # Issue #17106
2603 # Crash when underlying read() returns non-bytes
2604 class NonbytesStream(self.StringIO):
2605 read1 = self.StringIO.read
2606 class NonbytesStream(self.StringIO):
2607 read1 = self.StringIO.read
2608 t = self.TextIOWrapper(NonbytesStream('a'))
2609 with self.maybeRaises(TypeError):
2610 t.read(1)
2611 t = self.TextIOWrapper(NonbytesStream('a'))
2612 with self.maybeRaises(TypeError):
2613 t.readline()
2614 t = self.TextIOWrapper(NonbytesStream('a'))
2615 self.assertEqual(t.read(), u'a')
2616
2617 def test_illegal_decoder(self):
2618 # Issue #17106
2619 # Crash when decoder returns non-string
2620 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2621 encoding='quopri_codec')
2622 with self.maybeRaises(TypeError):
2623 t.read(1)
2624 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2625 encoding='quopri_codec')
2626 with self.maybeRaises(TypeError):
2627 t.readline()
2628 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2629 encoding='quopri_codec')
2630 with self.maybeRaises(TypeError):
2631 t.read()
2632
2633
Antoine Pitrou19690592009-06-12 20:14:08 +00002634class CTextIOWrapperTest(TextIOWrapperTest):
2635
2636 def test_initialization(self):
2637 r = self.BytesIO(b"\xc3\xa9\n\n")
2638 b = self.BufferedReader(r, 1000)
2639 t = self.TextIOWrapper(b)
2640 self.assertRaises(TypeError, t.__init__, b, newline=42)
2641 self.assertRaises(ValueError, t.read)
2642 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2643 self.assertRaises(ValueError, t.read)
2644
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002645 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2646 self.assertRaises(Exception, repr, t)
2647
Antoine Pitrou19690592009-06-12 20:14:08 +00002648 def test_garbage_collection(self):
2649 # C TextIOWrapper objects are collected, and collecting them flushes
2650 # all data to disk.
2651 # The Python version has __del__, so it ends in gc.garbage instead.
2652 rawio = io.FileIO(support.TESTFN, "wb")
2653 b = self.BufferedWriter(rawio)
2654 t = self.TextIOWrapper(b, encoding="ascii")
2655 t.write("456def")
2656 t.x = t
2657 wr = weakref.ref(t)
2658 del t
2659 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002660 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002661 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002662 self.assertEqual(f.read(), b"456def")
2663
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002664 def test_rwpair_cleared_before_textio(self):
2665 # Issue 13070: TextIOWrapper's finalization would crash when called
2666 # after the reference to the underlying BufferedRWPair's writer got
2667 # cleared by the GC.
2668 for i in range(1000):
2669 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2670 t1 = self.TextIOWrapper(b1, encoding="ascii")
2671 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2672 t2 = self.TextIOWrapper(b2, encoding="ascii")
2673 # circular references
2674 t1.buddy = t2
2675 t2.buddy = t1
2676 support.gc_collect()
2677
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002678 maybeRaises = unittest.TestCase.assertRaises
2679
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002680
Antoine Pitrou19690592009-06-12 20:14:08 +00002681class PyTextIOWrapperTest(TextIOWrapperTest):
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002682 @contextlib.contextmanager
2683 def maybeRaises(self, *args, **kwds):
2684 yield
Antoine Pitrou19690592009-06-12 20:14:08 +00002685
2686
2687class IncrementalNewlineDecoderTest(unittest.TestCase):
2688
2689 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002690 # UTF-8 specific tests for a newline decoder
2691 def _check_decode(b, s, **kwargs):
2692 # We exercise getstate() / setstate() as well as decode()
2693 state = decoder.getstate()
Ezio Melotti2623a372010-11-21 13:34:58 +00002694 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002695 decoder.setstate(state)
Ezio Melotti2623a372010-11-21 13:34:58 +00002696 self.assertEqual(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002697
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002698 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002699
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002700 _check_decode(b'\xe8', "")
2701 _check_decode(b'\xa2', "")
2702 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002703
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002704 _check_decode(b'\xe8', "")
2705 _check_decode(b'\xa2', "")
2706 _check_decode(b'\x88', "\u8888")
2707
2708 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002709 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2710
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002711 decoder.reset()
2712 _check_decode(b'\n', "\n")
2713 _check_decode(b'\r', "")
2714 _check_decode(b'', "\n", final=True)
2715 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002716
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002717 _check_decode(b'\r', "")
2718 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002719
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002720 _check_decode(b'\r\r\n', "\n\n")
2721 _check_decode(b'\r', "")
2722 _check_decode(b'\r', "\n")
2723 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002724
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002725 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2726 _check_decode(b'\xe8\xa2\x88', "\u8888")
2727 _check_decode(b'\n', "\n")
2728 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2729 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002730
Antoine Pitrou19690592009-06-12 20:14:08 +00002731 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002732 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002733 if encoding is not None:
2734 encoder = codecs.getincrementalencoder(encoding)()
2735 def _decode_bytewise(s):
2736 # Decode one byte at a time
2737 for b in encoder.encode(s):
2738 result.append(decoder.decode(b))
2739 else:
2740 encoder = None
2741 def _decode_bytewise(s):
2742 # Decode one char at a time
2743 for c in s:
2744 result.append(decoder.decode(c))
Ezio Melotti2623a372010-11-21 13:34:58 +00002745 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002746 _decode_bytewise("abc\n\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002747 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002748 _decode_bytewise("\nabc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002749 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002750 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002751 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002752 _decode_bytewise("abc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002753 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002754 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002755 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002756 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002757 input = "abc"
2758 if encoder is not None:
2759 encoder.reset()
2760 input = encoder.encode(input)
Ezio Melotti2623a372010-11-21 13:34:58 +00002761 self.assertEqual(decoder.decode(input), "abc")
2762 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002763
2764 def test_newline_decoder(self):
2765 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002766 # None meaning the IncrementalNewlineDecoder takes unicode input
2767 # rather than bytes input
2768 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002769 'utf-16', 'utf-16-le', 'utf-16-be',
2770 'utf-32', 'utf-32-le', 'utf-32-be',
2771 )
2772 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002773 decoder = enc and codecs.getincrementaldecoder(enc)()
2774 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2775 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002776 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002777 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2778 self.check_newline_decoding_utf8(decoder)
2779
2780 def test_newline_bytes(self):
2781 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2782 def _check(dec):
Ezio Melotti2623a372010-11-21 13:34:58 +00002783 self.assertEqual(dec.newlines, None)
2784 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2785 self.assertEqual(dec.newlines, None)
2786 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2787 self.assertEqual(dec.newlines, None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002788 dec = self.IncrementalNewlineDecoder(None, translate=False)
2789 _check(dec)
2790 dec = self.IncrementalNewlineDecoder(None, translate=True)
2791 _check(dec)
2792
2793class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2794 pass
2795
2796class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2797 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002798
Christian Heimes1a6387e2008-03-26 12:49:49 +00002799
2800# XXX Tests for open()
2801
2802class MiscIOTest(unittest.TestCase):
2803
Benjamin Petersonad100c32008-11-20 22:06:22 +00002804 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002805 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002806
Antoine Pitrou19690592009-06-12 20:14:08 +00002807 def test___all__(self):
2808 for name in self.io.__all__:
2809 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002810 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002811 if name == "open":
2812 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002813 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002814 self.assertTrue(issubclass(obj, Exception), name)
2815 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002816 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002817
Benjamin Petersonad100c32008-11-20 22:06:22 +00002818 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002819 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002820 self.assertEqual(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002821 f.close()
2822
Antoine Pitrou19690592009-06-12 20:14:08 +00002823 f = self.open(support.TESTFN, "U")
Ezio Melotti2623a372010-11-21 13:34:58 +00002824 self.assertEqual(f.name, support.TESTFN)
2825 self.assertEqual(f.buffer.name, support.TESTFN)
2826 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2827 self.assertEqual(f.mode, "U")
2828 self.assertEqual(f.buffer.mode, "rb")
2829 self.assertEqual(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002830 f.close()
2831
Antoine Pitrou19690592009-06-12 20:14:08 +00002832 f = self.open(support.TESTFN, "w+")
Ezio Melotti2623a372010-11-21 13:34:58 +00002833 self.assertEqual(f.mode, "w+")
2834 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2835 self.assertEqual(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002836
Antoine Pitrou19690592009-06-12 20:14:08 +00002837 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti2623a372010-11-21 13:34:58 +00002838 self.assertEqual(g.mode, "wb")
2839 self.assertEqual(g.raw.mode, "wb")
2840 self.assertEqual(g.name, f.fileno())
2841 self.assertEqual(g.raw.name, f.fileno())
Benjamin Petersonad100c32008-11-20 22:06:22 +00002842 f.close()
2843 g.close()
2844
Antoine Pitrou19690592009-06-12 20:14:08 +00002845 def test_io_after_close(self):
2846 for kwargs in [
2847 {"mode": "w"},
2848 {"mode": "wb"},
2849 {"mode": "w", "buffering": 1},
2850 {"mode": "w", "buffering": 2},
2851 {"mode": "wb", "buffering": 0},
2852 {"mode": "r"},
2853 {"mode": "rb"},
2854 {"mode": "r", "buffering": 1},
2855 {"mode": "r", "buffering": 2},
2856 {"mode": "rb", "buffering": 0},
2857 {"mode": "w+"},
2858 {"mode": "w+b"},
2859 {"mode": "w+", "buffering": 1},
2860 {"mode": "w+", "buffering": 2},
2861 {"mode": "w+b", "buffering": 0},
2862 ]:
2863 f = self.open(support.TESTFN, **kwargs)
2864 f.close()
2865 self.assertRaises(ValueError, f.flush)
2866 self.assertRaises(ValueError, f.fileno)
2867 self.assertRaises(ValueError, f.isatty)
2868 self.assertRaises(ValueError, f.__iter__)
2869 if hasattr(f, "peek"):
2870 self.assertRaises(ValueError, f.peek, 1)
2871 self.assertRaises(ValueError, f.read)
2872 if hasattr(f, "read1"):
2873 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinner5100a402011-05-25 22:15:36 +02002874 if hasattr(f, "readall"):
2875 self.assertRaises(ValueError, f.readall)
Antoine Pitrou19690592009-06-12 20:14:08 +00002876 if hasattr(f, "readinto"):
2877 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2878 self.assertRaises(ValueError, f.readline)
2879 self.assertRaises(ValueError, f.readlines)
2880 self.assertRaises(ValueError, f.seek, 0)
2881 self.assertRaises(ValueError, f.tell)
2882 self.assertRaises(ValueError, f.truncate)
2883 self.assertRaises(ValueError, f.write,
2884 b"" if "b" in kwargs['mode'] else "")
2885 self.assertRaises(ValueError, f.writelines, [])
2886 self.assertRaises(ValueError, next, f)
2887
2888 def test_blockingioerror(self):
2889 # Various BlockingIOError issues
2890 self.assertRaises(TypeError, self.BlockingIOError)
2891 self.assertRaises(TypeError, self.BlockingIOError, 1)
2892 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2893 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2894 b = self.BlockingIOError(1, "")
2895 self.assertEqual(b.characters_written, 0)
2896 class C(unicode):
2897 pass
2898 c = C("")
2899 b = self.BlockingIOError(1, c)
2900 c.b = b
2901 b.c = c
2902 wr = weakref.ref(c)
2903 del c, b
2904 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002905 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002906
2907 def test_abcs(self):
2908 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002909 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2910 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2911 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2912 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002913
2914 def _check_abc_inheritance(self, abcmodule):
2915 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002916 self.assertIsInstance(f, abcmodule.IOBase)
2917 self.assertIsInstance(f, abcmodule.RawIOBase)
2918 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2919 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002920 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002921 self.assertIsInstance(f, abcmodule.IOBase)
2922 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2923 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2924 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002925 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002926 self.assertIsInstance(f, abcmodule.IOBase)
2927 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2928 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2929 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002930
2931 def test_abc_inheritance(self):
2932 # Test implementations inherit from their respective ABCs
2933 self._check_abc_inheritance(self)
2934
2935 def test_abc_inheritance_official(self):
2936 # Test implementations inherit from the official ABCs of the
2937 # baseline "io" module.
2938 self._check_abc_inheritance(io)
2939
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01002940 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2941 def test_nonblock_pipe_write_bigbuf(self):
2942 self._test_nonblock_pipe_write(16*1024)
2943
2944 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2945 def test_nonblock_pipe_write_smallbuf(self):
2946 self._test_nonblock_pipe_write(1024)
2947
2948 def _set_non_blocking(self, fd):
2949 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2950 self.assertNotEqual(flags, -1)
2951 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2952 self.assertEqual(res, 0)
2953
2954 def _test_nonblock_pipe_write(self, bufsize):
2955 sent = []
2956 received = []
2957 r, w = os.pipe()
2958 self._set_non_blocking(r)
2959 self._set_non_blocking(w)
2960
2961 # To exercise all code paths in the C implementation we need
2962 # to play with buffer sizes. For instance, if we choose a
2963 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2964 # then we will never get a partial write of the buffer.
2965 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2966 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2967
2968 with rf, wf:
2969 for N in 9999, 73, 7574:
2970 try:
2971 i = 0
2972 while True:
2973 msg = bytes([i % 26 + 97]) * N
2974 sent.append(msg)
2975 wf.write(msg)
2976 i += 1
2977
2978 except self.BlockingIOError as e:
2979 self.assertEqual(e.args[0], errno.EAGAIN)
2980 sent[-1] = sent[-1][:e.characters_written]
2981 received.append(rf.read())
2982 msg = b'BLOCKED'
2983 wf.write(msg)
2984 sent.append(msg)
2985
2986 while True:
2987 try:
2988 wf.flush()
2989 break
2990 except self.BlockingIOError as e:
2991 self.assertEqual(e.args[0], errno.EAGAIN)
2992 self.assertEqual(e.characters_written, 0)
2993 received.append(rf.read())
2994
2995 received += iter(rf.read, None)
2996
2997 sent, received = b''.join(sent), b''.join(received)
2998 self.assertTrue(sent == received)
2999 self.assertTrue(wf.closed)
3000 self.assertTrue(rf.closed)
3001
Antoine Pitrou19690592009-06-12 20:14:08 +00003002class CMiscIOTest(MiscIOTest):
3003 io = io
3004
3005class PyMiscIOTest(MiscIOTest):
3006 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00003007
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003008
3009@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3010class SignalsTest(unittest.TestCase):
3011
3012 def setUp(self):
3013 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3014
3015 def tearDown(self):
3016 signal.signal(signal.SIGALRM, self.oldalrm)
3017
3018 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00003019 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003020
3021 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinner49d495f2011-07-04 11:44:46 +02003022 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
3023 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003024 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3025 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou6439c002011-02-25 21:35:47 +00003026 invokes the signal handler, and bubbles up the exception raised
3027 in the latter."""
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003028 read_results = []
3029 def _read():
3030 s = os.read(r, 1)
3031 read_results.append(s)
3032 t = threading.Thread(target=_read)
3033 t.daemon = True
3034 r, w = os.pipe()
3035 try:
3036 wio = self.io.open(w, **fdopen_kwargs)
3037 t.start()
3038 signal.alarm(1)
3039 # Fill the pipe enough that the write will be blocking.
3040 # It will be interrupted by the timer armed above. Since the
3041 # other thread has read one byte, the low-level write will
3042 # return with a successful (partial) result rather than an EINTR.
3043 # The buffered IO layer must check for pending signal
3044 # handlers, which in this case will invoke alarm_interrupt().
3045 self.assertRaises(ZeroDivisionError,
Antoine Pitrou68915d72013-04-24 23:31:38 +02003046 wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003047 t.join()
3048 # We got one byte, get another one and check that it isn't a
3049 # repeat of the first one.
3050 read_results.append(os.read(r, 1))
3051 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3052 finally:
3053 os.close(w)
3054 os.close(r)
3055 # This is deliberate. If we didn't close the file descriptor
3056 # before closing wio, wio would try to flush its internal
3057 # buffer, and block again.
3058 try:
3059 wio.close()
3060 except IOError as e:
3061 if e.errno != errno.EBADF:
3062 raise
3063
3064 def test_interrupted_write_unbuffered(self):
3065 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3066
3067 def test_interrupted_write_buffered(self):
3068 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3069
3070 def test_interrupted_write_text(self):
3071 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3072
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003073 def check_reentrant_write(self, data, **fdopen_kwargs):
3074 def on_alarm(*args):
3075 # Will be called reentrantly from the same thread
3076 wio.write(data)
Victor Stinner4c41f842011-07-05 13:29:26 +02003077 1//0
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003078 signal.signal(signal.SIGALRM, on_alarm)
3079 r, w = os.pipe()
3080 wio = self.io.open(w, **fdopen_kwargs)
3081 try:
3082 signal.alarm(1)
3083 # Either the reentrant call to wio.write() fails with RuntimeError,
3084 # or the signal handler raises ZeroDivisionError.
3085 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3086 while 1:
3087 for i in range(100):
3088 wio.write(data)
3089 wio.flush()
3090 # Make sure the buffer doesn't fill up and block further writes
3091 os.read(r, len(data) * 100)
3092 exc = cm.exception
3093 if isinstance(exc, RuntimeError):
3094 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3095 finally:
3096 wio.close()
3097 os.close(r)
3098
3099 def test_reentrant_write_buffered(self):
3100 self.check_reentrant_write(b"xy", mode="wb")
3101
3102 def test_reentrant_write_text(self):
3103 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3104
Antoine Pitrou6439c002011-02-25 21:35:47 +00003105 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3106 """Check that a buffered read, when it gets interrupted (either
3107 returning a partial result or EINTR), properly invokes the signal
3108 handler and retries if the latter returned successfully."""
3109 r, w = os.pipe()
3110 fdopen_kwargs["closefd"] = False
3111 def alarm_handler(sig, frame):
3112 os.write(w, b"bar")
3113 signal.signal(signal.SIGALRM, alarm_handler)
3114 try:
3115 rio = self.io.open(r, **fdopen_kwargs)
3116 os.write(w, b"foo")
3117 signal.alarm(1)
3118 # Expected behaviour:
3119 # - first raw read() returns partial b"foo"
3120 # - second raw read() returns EINTR
3121 # - third raw read() returns b"bar"
3122 self.assertEqual(decode(rio.read(6)), "foobar")
3123 finally:
3124 rio.close()
3125 os.close(w)
3126 os.close(r)
3127
3128 def test_interrupterd_read_retry_buffered(self):
3129 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3130 mode="rb")
3131
3132 def test_interrupterd_read_retry_text(self):
3133 self.check_interrupted_read_retry(lambda x: x,
3134 mode="r")
3135
3136 @unittest.skipUnless(threading, 'Threading required for this test.')
3137 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3138 """Check that a buffered write, when it gets interrupted (either
3139 returning a partial result or EINTR), properly invokes the signal
3140 handler and retries if the latter returned successfully."""
3141 select = support.import_module("select")
3142 # A quantity that exceeds the buffer size of an anonymous pipe's
3143 # write end.
Antoine Pitrou68915d72013-04-24 23:31:38 +02003144 N = support.PIPE_MAX_SIZE
Antoine Pitrou6439c002011-02-25 21:35:47 +00003145 r, w = os.pipe()
3146 fdopen_kwargs["closefd"] = False
3147 # We need a separate thread to read from the pipe and allow the
3148 # write() to finish. This thread is started after the SIGALRM is
3149 # received (forcing a first EINTR in write()).
3150 read_results = []
3151 write_finished = False
Serhiy Storchaka53ea1622015-03-28 20:38:48 +02003152 error = [None]
Antoine Pitrou6439c002011-02-25 21:35:47 +00003153 def _read():
Serhiy Storchaka53ea1622015-03-28 20:38:48 +02003154 try:
3155 while not write_finished:
3156 while r in select.select([r], [], [], 1.0)[0]:
3157 s = os.read(r, 1024)
3158 read_results.append(s)
3159 except BaseException as exc:
3160 error[0] = exc
Antoine Pitrou6439c002011-02-25 21:35:47 +00003161 t = threading.Thread(target=_read)
3162 t.daemon = True
3163 def alarm1(sig, frame):
3164 signal.signal(signal.SIGALRM, alarm2)
3165 signal.alarm(1)
3166 def alarm2(sig, frame):
3167 t.start()
3168 signal.signal(signal.SIGALRM, alarm1)
3169 try:
3170 wio = self.io.open(w, **fdopen_kwargs)
3171 signal.alarm(1)
3172 # Expected behaviour:
3173 # - first raw write() is partial (because of the limited pipe buffer
3174 # and the first alarm)
3175 # - second raw write() returns EINTR (because of the second alarm)
3176 # - subsequent write()s are successful (either partial or complete)
3177 self.assertEqual(N, wio.write(item * N))
3178 wio.flush()
3179 write_finished = True
3180 t.join()
Serhiy Storchaka53ea1622015-03-28 20:38:48 +02003181
3182 self.assertIsNone(error[0])
Antoine Pitrou6439c002011-02-25 21:35:47 +00003183 self.assertEqual(N, sum(len(x) for x in read_results))
3184 finally:
3185 write_finished = True
3186 os.close(w)
3187 os.close(r)
3188 # This is deliberate. If we didn't close the file descriptor
3189 # before closing wio, wio would try to flush its internal
3190 # buffer, and could block (in case of failure).
3191 try:
3192 wio.close()
3193 except IOError as e:
3194 if e.errno != errno.EBADF:
3195 raise
3196
3197 def test_interrupterd_write_retry_buffered(self):
3198 self.check_interrupted_write_retry(b"x", mode="wb")
3199
3200 def test_interrupterd_write_retry_text(self):
3201 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3202
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003203
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003204class CSignalsTest(SignalsTest):
3205 io = io
3206
3207class PySignalsTest(SignalsTest):
3208 io = pyio
3209
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003210 # Handling reentrancy issues would slow down _pyio even more, so the
3211 # tests are disabled.
3212 test_reentrant_write_buffered = None
3213 test_reentrant_write_text = None
3214
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003215
Christian Heimes1a6387e2008-03-26 12:49:49 +00003216def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00003217 tests = (CIOTest, PyIOTest,
3218 CBufferedReaderTest, PyBufferedReaderTest,
3219 CBufferedWriterTest, PyBufferedWriterTest,
3220 CBufferedRWPairTest, PyBufferedRWPairTest,
3221 CBufferedRandomTest, PyBufferedRandomTest,
3222 StatefulIncrementalDecoderTest,
3223 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3224 CTextIOWrapperTest, PyTextIOWrapperTest,
3225 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003226 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00003227 )
3228
3229 # Put the namespaces of the IO module we are testing and some useful mock
3230 # classes in the __dict__ of each test.
3231 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00003232 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00003233 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3234 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
3235 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
3236 globs = globals()
3237 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3238 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3239 # Avoid turning open into a bound method.
3240 py_io_ns["open"] = pyio.OpenWrapper
3241 for test in tests:
3242 if test.__name__.startswith("C"):
3243 for name, obj in c_io_ns.items():
3244 setattr(test, name, obj)
3245 elif test.__name__.startswith("Py"):
3246 for name, obj in py_io_ns.items():
3247 setattr(test, name, obj)
3248
3249 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00003250
3251if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00003252 test_main()