blob: 56f403356c1f35df4e03edb4f68187fba536c1e0 [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Serhiy Storchaka05b0a1b2014-06-09 13:32:08 +030032import warnings
Antoine Pitrou19690592009-06-12 20:14:08 +000033import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000034import signal
35import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000036from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000037from collections import deque
Antoine Pitrou78e761e2012-10-16 22:57:11 +020038from UserList import UserList
Antoine Pitrou19690592009-06-12 20:14:08 +000039from test import test_support as support
Serhiy Storchaka354d50e2013-02-03 17:10:42 +020040import contextlib
Christian Heimes1a6387e2008-03-26 12:49:49 +000041
42import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000043import io # C implementation of io
44import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000045try:
46 import threading
47except ImportError:
48 threading = None
Antoine Pitrou5aa7df32011-11-21 20:16:44 +010049try:
50 import fcntl
51except ImportError:
52 fcntl = None
Antoine Pitrou19690592009-06-12 20:14:08 +000053
54__metaclass__ = type
55bytes = support.py3k_bytes
56
57def _default_chunk_size():
58 """Get the default TextIOWrapper chunk size"""
59 with io.open(__file__, "r", encoding="latin1") as f:
60 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000061
62
Antoine Pitrou6391b342010-09-14 18:48:19 +000063class MockRawIOWithoutRead:
64 """A RawIO implementation without read(), so as to exercise the default
65 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000066
67 def __init__(self, read_stack=()):
68 self._read_stack = list(read_stack)
69 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000070 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000071 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000072
Christian Heimes1a6387e2008-03-26 12:49:49 +000073 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000074 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000075 return len(b)
76
77 def writable(self):
78 return True
79
80 def fileno(self):
81 return 42
82
83 def readable(self):
84 return True
85
86 def seekable(self):
87 return True
88
89 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000090 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000091
92 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000093 return 0 # same comment as above
94
95 def readinto(self, buf):
96 self._reads += 1
97 max_len = len(buf)
98 try:
99 data = self._read_stack[0]
100 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000101 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +0000102 return 0
103 if data is None:
104 del self._read_stack[0]
105 return None
106 n = len(data)
107 if len(data) <= max_len:
108 del self._read_stack[0]
109 buf[:n] = data
110 return n
111 else:
112 buf[:] = data[:max_len]
113 self._read_stack[0] = data[max_len:]
114 return max_len
115
116 def truncate(self, pos=None):
117 return pos
118
Antoine Pitrou6391b342010-09-14 18:48:19 +0000119class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
120 pass
121
122class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
123 pass
124
125
126class MockRawIO(MockRawIOWithoutRead):
127
128 def read(self, n=None):
129 self._reads += 1
130 try:
131 return self._read_stack.pop(0)
132 except:
133 self._extraneous_reads += 1
134 return b""
135
Antoine Pitrou19690592009-06-12 20:14:08 +0000136class CMockRawIO(MockRawIO, io.RawIOBase):
137 pass
138
139class PyMockRawIO(MockRawIO, pyio.RawIOBase):
140 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000141
142
Antoine Pitrou19690592009-06-12 20:14:08 +0000143class MisbehavedRawIO(MockRawIO):
144 def write(self, b):
145 return MockRawIO.write(self, b) * 2
146
147 def read(self, n=None):
148 return MockRawIO.read(self, n) * 2
149
150 def seek(self, pos, whence):
151 return -123
152
153 def tell(self):
154 return -456
155
156 def readinto(self, buf):
157 MockRawIO.readinto(self, buf)
158 return len(buf) * 5
159
160class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
161 pass
162
163class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
164 pass
165
166
167class CloseFailureIO(MockRawIO):
168 closed = 0
169
170 def close(self):
171 if not self.closed:
172 self.closed = 1
173 raise IOError
174
175class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
176 pass
177
178class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
179 pass
180
181
182class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000183
184 def __init__(self, data):
185 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000186 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000187
188 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000189 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000190 self.read_history.append(None if res is None else len(res))
191 return res
192
Antoine Pitrou19690592009-06-12 20:14:08 +0000193 def readinto(self, b):
194 res = super(MockFileIO, self).readinto(b)
195 self.read_history.append(res)
196 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000197
Antoine Pitrou19690592009-06-12 20:14:08 +0000198class CMockFileIO(MockFileIO, io.BytesIO):
199 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000200
Antoine Pitrou19690592009-06-12 20:14:08 +0000201class PyMockFileIO(MockFileIO, pyio.BytesIO):
202 pass
203
204
205class MockNonBlockWriterIO:
206
207 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000208 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000209 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000210
Antoine Pitrou19690592009-06-12 20:14:08 +0000211 def pop_written(self):
212 s = b"".join(self._write_stack)
213 self._write_stack[:] = []
214 return s
215
216 def block_on(self, char):
217 """Block when a given char is encountered."""
218 self._blocker_char = char
219
220 def readable(self):
221 return True
222
223 def seekable(self):
224 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000225
226 def writable(self):
227 return True
228
Antoine Pitrou19690592009-06-12 20:14:08 +0000229 def write(self, b):
230 b = bytes(b)
231 n = -1
232 if self._blocker_char:
233 try:
234 n = b.index(self._blocker_char)
235 except ValueError:
236 pass
237 else:
Antoine Pitrou5aa7df32011-11-21 20:16:44 +0100238 if n > 0:
239 # write data up to the first blocker
240 self._write_stack.append(b[:n])
241 return n
242 else:
243 # cancel blocker and indicate would block
244 self._blocker_char = None
245 return None
Antoine Pitrou19690592009-06-12 20:14:08 +0000246 self._write_stack.append(b)
247 return len(b)
248
249class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
250 BlockingIOError = io.BlockingIOError
251
252class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
253 BlockingIOError = pyio.BlockingIOError
254
Christian Heimes1a6387e2008-03-26 12:49:49 +0000255
256class IOTest(unittest.TestCase):
257
Antoine Pitrou19690592009-06-12 20:14:08 +0000258 def setUp(self):
259 support.unlink(support.TESTFN)
260
Christian Heimes1a6387e2008-03-26 12:49:49 +0000261 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000262 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000263
264 def write_ops(self, f):
265 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000266 f.truncate(0)
267 self.assertEqual(f.tell(), 5)
268 f.seek(0)
269
270 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000271 self.assertEqual(f.seek(0), 0)
272 self.assertEqual(f.write(b"Hello."), 6)
273 self.assertEqual(f.tell(), 6)
274 self.assertEqual(f.seek(-1, 1), 5)
275 self.assertEqual(f.tell(), 5)
276 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
277 self.assertEqual(f.seek(0), 0)
278 self.assertEqual(f.write(b"h"), 1)
279 self.assertEqual(f.seek(-1, 2), 13)
280 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000281
Christian Heimes1a6387e2008-03-26 12:49:49 +0000282 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000283 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000284 self.assertRaises(TypeError, f.seek, 0.0)
285
286 def read_ops(self, f, buffered=False):
287 data = f.read(5)
288 self.assertEqual(data, b"hello")
289 data = bytearray(data)
290 self.assertEqual(f.readinto(data), 5)
291 self.assertEqual(data, b" worl")
292 self.assertEqual(f.readinto(data), 2)
293 self.assertEqual(len(data), 5)
294 self.assertEqual(data[:2], b"d\n")
295 self.assertEqual(f.seek(0), 0)
296 self.assertEqual(f.read(20), b"hello world\n")
297 self.assertEqual(f.read(1), b"")
298 self.assertEqual(f.readinto(bytearray(b"x")), 0)
299 self.assertEqual(f.seek(-6, 2), 6)
300 self.assertEqual(f.read(5), b"world")
301 self.assertEqual(f.read(0), b"")
302 self.assertEqual(f.readinto(bytearray()), 0)
303 self.assertEqual(f.seek(-6, 1), 5)
304 self.assertEqual(f.read(5), b" worl")
305 self.assertEqual(f.tell(), 10)
306 self.assertRaises(TypeError, f.seek, 0.0)
307 if buffered:
308 f.seek(0)
309 self.assertEqual(f.read(), b"hello world\n")
310 f.seek(6)
311 self.assertEqual(f.read(), b"world\n")
312 self.assertEqual(f.read(), b"")
313
314 LARGE = 2**31
315
316 def large_file_ops(self, f):
317 assert f.readable()
318 assert f.writable()
319 self.assertEqual(f.seek(self.LARGE), self.LARGE)
320 self.assertEqual(f.tell(), self.LARGE)
321 self.assertEqual(f.write(b"xxx"), 3)
322 self.assertEqual(f.tell(), self.LARGE + 3)
323 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
324 self.assertEqual(f.truncate(), self.LARGE + 2)
325 self.assertEqual(f.tell(), self.LARGE + 2)
326 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
327 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000328 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000329 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
330 self.assertEqual(f.seek(-1, 2), self.LARGE)
331 self.assertEqual(f.read(2), b"x")
332
Antoine Pitrou19690592009-06-12 20:14:08 +0000333 def test_invalid_operations(self):
334 # Try writing on a file opened in read mode and vice-versa.
335 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000336 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000337 self.assertRaises(IOError, fp.read)
338 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000339 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000340 self.assertRaises(IOError, fp.write, b"blah")
341 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000342 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000343 self.assertRaises(IOError, fp.write, "blah")
344 self.assertRaises(IOError, fp.writelines, ["blah\n"])
345
Christian Heimes1a6387e2008-03-26 12:49:49 +0000346 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000347 with self.open(support.TESTFN, "wb", buffering=0) as f:
348 self.assertEqual(f.readable(), False)
349 self.assertEqual(f.writable(), True)
350 self.assertEqual(f.seekable(), True)
351 self.write_ops(f)
352 with self.open(support.TESTFN, "rb", buffering=0) as f:
353 self.assertEqual(f.readable(), True)
354 self.assertEqual(f.writable(), False)
355 self.assertEqual(f.seekable(), True)
356 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000357
358 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000359 with self.open(support.TESTFN, "wb") as f:
360 self.assertEqual(f.readable(), False)
361 self.assertEqual(f.writable(), True)
362 self.assertEqual(f.seekable(), True)
363 self.write_ops(f)
364 with self.open(support.TESTFN, "rb") as f:
365 self.assertEqual(f.readable(), True)
366 self.assertEqual(f.writable(), False)
367 self.assertEqual(f.seekable(), True)
368 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000369
370 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000371 with self.open(support.TESTFN, "wb") as f:
372 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
373 with self.open(support.TESTFN, "rb") as f:
374 self.assertEqual(f.readline(), b"abc\n")
375 self.assertEqual(f.readline(10), b"def\n")
376 self.assertEqual(f.readline(2), b"xy")
377 self.assertEqual(f.readline(4), b"zzy\n")
378 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000379 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000380 self.assertRaises(TypeError, f.readline, 5.3)
381 with self.open(support.TESTFN, "r") as f:
382 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000383
384 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000385 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000386 self.write_ops(f)
387 data = f.getvalue()
388 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000389 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000390 self.read_ops(f, True)
391
392 def test_large_file_ops(self):
393 # On Windows and Mac OSX this test comsumes large resources; It takes
394 # a long time to build the >2GB file and takes >2GB of disk space
395 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000396 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware1f702212013-12-10 14:09:20 -0600397 support.requires(
398 'largefile',
399 'test requires %s bytes and a long time to run' % self.LARGE)
Antoine Pitrou19690592009-06-12 20:14:08 +0000400 with self.open(support.TESTFN, "w+b", 0) as f:
401 self.large_file_ops(f)
402 with self.open(support.TESTFN, "w+b") as f:
403 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000404
405 def test_with_open(self):
406 for bufsize in (0, 1, 100):
407 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000408 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000409 f.write(b"xxx")
410 self.assertEqual(f.closed, True)
411 f = None
412 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000413 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000414 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000415 except ZeroDivisionError:
416 self.assertEqual(f.closed, True)
417 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000418 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000419
Antoine Pitroue741cc62009-01-21 00:45:36 +0000420 # issue 5008
421 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000422 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000423 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000424 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000425 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000426 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000427 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000428 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000429 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000430
Christian Heimes1a6387e2008-03-26 12:49:49 +0000431 def test_destructor(self):
432 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000433 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000434 def __del__(self):
435 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000436 try:
437 f = super(MyFileIO, self).__del__
438 except AttributeError:
439 pass
440 else:
441 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000442 def close(self):
443 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000444 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000445 def flush(self):
446 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000447 super(MyFileIO, self).flush()
448 f = MyFileIO(support.TESTFN, "wb")
449 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000450 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000451 support.gc_collect()
452 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000453 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000454 self.assertEqual(f.read(), b"xxx")
455
456 def _check_base_destructor(self, base):
457 record = []
458 class MyIO(base):
459 def __init__(self):
460 # This exercises the availability of attributes on object
461 # destruction.
462 # (in the C version, close() is called by the tp_dealloc
463 # function, not by __del__)
464 self.on_del = 1
465 self.on_close = 2
466 self.on_flush = 3
467 def __del__(self):
468 record.append(self.on_del)
469 try:
470 f = super(MyIO, self).__del__
471 except AttributeError:
472 pass
473 else:
474 f()
475 def close(self):
476 record.append(self.on_close)
477 super(MyIO, self).close()
478 def flush(self):
479 record.append(self.on_flush)
480 super(MyIO, self).flush()
481 f = MyIO()
482 del f
483 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000484 self.assertEqual(record, [1, 2, 3])
485
Antoine Pitrou19690592009-06-12 20:14:08 +0000486 def test_IOBase_destructor(self):
487 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000488
Antoine Pitrou19690592009-06-12 20:14:08 +0000489 def test_RawIOBase_destructor(self):
490 self._check_base_destructor(self.RawIOBase)
491
492 def test_BufferedIOBase_destructor(self):
493 self._check_base_destructor(self.BufferedIOBase)
494
495 def test_TextIOBase_destructor(self):
496 self._check_base_destructor(self.TextIOBase)
497
498 def test_close_flushes(self):
499 with self.open(support.TESTFN, "wb") as f:
500 f.write(b"xxx")
501 with self.open(support.TESTFN, "rb") as f:
502 self.assertEqual(f.read(), b"xxx")
503
504 def test_array_writes(self):
505 a = array.array(b'i', range(10))
506 n = len(a.tostring())
507 with self.open(support.TESTFN, "wb", 0) as f:
508 self.assertEqual(f.write(a), n)
509 with self.open(support.TESTFN, "wb") as f:
510 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000511
512 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000513 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000514 closefd=False)
515
Antoine Pitrou19690592009-06-12 20:14:08 +0000516 def test_read_closed(self):
517 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000518 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000519 with self.open(support.TESTFN, "r") as f:
520 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000521 self.assertEqual(file.read(), "egg\n")
522 file.seek(0)
523 file.close()
524 self.assertRaises(ValueError, file.read)
525
526 def test_no_closefd_with_filename(self):
527 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000528 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000529
530 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000531 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000532 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000533 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000534 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000535 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000536 self.assertEqual(file.buffer.raw.closefd, False)
537
Antoine Pitrou19690592009-06-12 20:14:08 +0000538 def test_garbage_collection(self):
539 # FileIO objects are collected, and collecting them flushes
540 # all data to disk.
541 f = self.FileIO(support.TESTFN, "wb")
542 f.write(b"abcxxx")
543 f.f = f
544 wr = weakref.ref(f)
545 del f
546 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000547 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000548 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000549 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000550
Antoine Pitrou19690592009-06-12 20:14:08 +0000551 def test_unbounded_file(self):
552 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
553 zero = "/dev/zero"
554 if not os.path.exists(zero):
555 self.skipTest("{0} does not exist".format(zero))
556 if sys.maxsize > 0x7FFFFFFF:
557 self.skipTest("test can only run in a 32-bit address space")
558 if support.real_max_memuse < support._2G:
559 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000560 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000561 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000562 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000563 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000564 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000565 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000566
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200567 def check_flush_error_on_close(self, *args, **kwargs):
568 # Test that the file is closed despite failed flush
569 # and that flush() is called before file closed.
570 f = self.open(*args, **kwargs)
571 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000572 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200573 closed[:] = [f.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000574 raise IOError()
575 f.flush = bad_flush
576 self.assertRaises(IOError, f.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600577 self.assertTrue(f.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200578 self.assertTrue(closed) # flush() called
579 self.assertFalse(closed[0]) # flush() called before file closed
580
581 def test_flush_error_on_close(self):
582 # raw file
583 # Issue #5700: io.FileIO calls flush() after file closed
584 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
585 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
586 self.check_flush_error_on_close(fd, 'wb', buffering=0)
587 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
588 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
589 os.close(fd)
590 # buffered io
591 self.check_flush_error_on_close(support.TESTFN, 'wb')
592 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
593 self.check_flush_error_on_close(fd, 'wb')
594 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
595 self.check_flush_error_on_close(fd, 'wb', closefd=False)
596 os.close(fd)
597 # text io
598 self.check_flush_error_on_close(support.TESTFN, 'w')
599 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
600 self.check_flush_error_on_close(fd, 'w')
601 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
602 self.check_flush_error_on_close(fd, 'w', closefd=False)
603 os.close(fd)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000604
605 def test_multi_close(self):
606 f = self.open(support.TESTFN, "wb", buffering=0)
607 f.close()
608 f.close()
609 f.close()
610 self.assertRaises(ValueError, f.flush)
611
Antoine Pitrou6391b342010-09-14 18:48:19 +0000612 def test_RawIOBase_read(self):
613 # Exercise the default RawIOBase.read() implementation (which calls
614 # readinto() internally).
615 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
616 self.assertEqual(rawio.read(2), b"ab")
617 self.assertEqual(rawio.read(2), b"c")
618 self.assertEqual(rawio.read(2), b"d")
619 self.assertEqual(rawio.read(2), None)
620 self.assertEqual(rawio.read(2), b"ef")
621 self.assertEqual(rawio.read(2), b"g")
622 self.assertEqual(rawio.read(2), None)
623 self.assertEqual(rawio.read(2), b"")
624
Hynek Schlawack877effc2012-05-25 09:24:18 +0200625 def test_fileio_closefd(self):
626 # Issue #4841
627 with self.open(__file__, 'rb') as f1, \
628 self.open(__file__, 'rb') as f2:
629 fileio = self.FileIO(f1.fileno(), closefd=False)
630 # .__init__() must not close f1
631 fileio.__init__(f2.fileno(), closefd=False)
632 f1.readline()
633 # .close() must not close f2
634 fileio.close()
635 f2.readline()
636
Serhiy Storchaka05b0a1b2014-06-09 13:32:08 +0300637 def test_nonbuffered_textio(self):
638 with warnings.catch_warnings(record=True) as recorded:
639 with self.assertRaises(ValueError):
640 self.open(support.TESTFN, 'w', buffering=0)
641 support.gc_collect()
642 self.assertEqual(recorded, [])
643
644 def test_invalid_newline(self):
645 with warnings.catch_warnings(record=True) as recorded:
646 with self.assertRaises(ValueError):
647 self.open(support.TESTFN, 'w', newline='invalid')
648 support.gc_collect()
649 self.assertEqual(recorded, [])
650
Hynek Schlawack877effc2012-05-25 09:24:18 +0200651
Antoine Pitrou19690592009-06-12 20:14:08 +0000652class CIOTest(IOTest):
Antoine Pitrou16166452011-07-12 22:04:20 +0200653
654 def test_IOBase_finalize(self):
655 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
656 # class which inherits IOBase and an object of this class are caught
657 # in a reference cycle and close() is already in the method cache.
658 class MyIO(self.IOBase):
659 def close(self):
660 pass
661
662 # create an instance to populate the method cache
663 MyIO()
664 obj = MyIO()
665 obj.obj = obj
666 wr = weakref.ref(obj)
667 del MyIO
668 del obj
669 support.gc_collect()
670 self.assertTrue(wr() is None, wr)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000671
Antoine Pitrou19690592009-06-12 20:14:08 +0000672class PyIOTest(IOTest):
673 test_array_writes = unittest.skip(
674 "len(array.array) returns number of elements rather than bytelength"
675 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000676
677
Antoine Pitrou19690592009-06-12 20:14:08 +0000678class CommonBufferedTests:
679 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
680
681 def test_detach(self):
682 raw = self.MockRawIO()
683 buf = self.tp(raw)
684 self.assertIs(buf.detach(), raw)
685 self.assertRaises(ValueError, buf.detach)
686
Benjamin Peterson53ae6142014-12-21 20:51:50 -0600687 repr(buf) # Should still work
688
Antoine Pitrou19690592009-06-12 20:14:08 +0000689 def test_fileno(self):
690 rawio = self.MockRawIO()
691 bufio = self.tp(rawio)
692
Ezio Melotti2623a372010-11-21 13:34:58 +0000693 self.assertEqual(42, bufio.fileno())
Antoine Pitrou19690592009-06-12 20:14:08 +0000694
Zachary Ware1f702212013-12-10 14:09:20 -0600695 @unittest.skip('test having existential crisis')
Antoine Pitrou19690592009-06-12 20:14:08 +0000696 def test_no_fileno(self):
697 # XXX will we always have fileno() function? If so, kill
698 # this test. Else, write it.
699 pass
700
701 def test_invalid_args(self):
702 rawio = self.MockRawIO()
703 bufio = self.tp(rawio)
704 # Invalid whence
705 self.assertRaises(ValueError, bufio.seek, 0, -1)
706 self.assertRaises(ValueError, bufio.seek, 0, 3)
707
708 def test_override_destructor(self):
709 tp = self.tp
710 record = []
711 class MyBufferedIO(tp):
712 def __del__(self):
713 record.append(1)
714 try:
715 f = super(MyBufferedIO, self).__del__
716 except AttributeError:
717 pass
718 else:
719 f()
720 def close(self):
721 record.append(2)
722 super(MyBufferedIO, self).close()
723 def flush(self):
724 record.append(3)
725 super(MyBufferedIO, self).flush()
726 rawio = self.MockRawIO()
727 bufio = MyBufferedIO(rawio)
728 writable = bufio.writable()
729 del bufio
730 support.gc_collect()
731 if writable:
732 self.assertEqual(record, [1, 2, 3])
733 else:
734 self.assertEqual(record, [1, 2])
735
736 def test_context_manager(self):
737 # Test usability as a context manager
738 rawio = self.MockRawIO()
739 bufio = self.tp(rawio)
740 def _with():
741 with bufio:
742 pass
743 _with()
744 # bufio should now be closed, and using it a second time should raise
745 # a ValueError.
746 self.assertRaises(ValueError, _with)
747
748 def test_error_through_destructor(self):
749 # Test that the exception state is not modified by a destructor,
750 # even if close() fails.
751 rawio = self.CloseFailureIO()
752 def f():
753 self.tp(rawio).xyzzy
754 with support.captured_output("stderr") as s:
755 self.assertRaises(AttributeError, f)
756 s = s.getvalue().strip()
757 if s:
758 # The destructor *may* have printed an unraisable error, check it
759 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000760 self.assertTrue(s.startswith("Exception IOError: "), s)
761 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000762
763 def test_repr(self):
764 raw = self.MockRawIO()
765 b = self.tp(raw)
766 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
767 self.assertEqual(repr(b), "<%s>" % clsname)
768 raw.name = "dummy"
769 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
770 raw.name = b"dummy"
771 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000772
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000773 def test_flush_error_on_close(self):
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200774 # Test that buffered file is closed despite failed flush
775 # and that flush() is called before file closed.
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000776 raw = self.MockRawIO()
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200777 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000778 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200779 closed[:] = [b.closed, raw.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000780 raise IOError()
781 raw.flush = bad_flush
782 b = self.tp(raw)
783 self.assertRaises(IOError, b.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600784 self.assertTrue(b.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200785 self.assertTrue(raw.closed)
786 self.assertTrue(closed) # flush() called
787 self.assertFalse(closed[0]) # flush() called before file closed
788 self.assertFalse(closed[1])
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600789
790 def test_close_error_on_close(self):
791 raw = self.MockRawIO()
792 def bad_flush():
793 raise IOError('flush')
794 def bad_close():
795 raise IOError('close')
796 raw.close = bad_close
797 b = self.tp(raw)
798 b.flush = bad_flush
799 with self.assertRaises(IOError) as err: # exception not swallowed
800 b.close()
801 self.assertEqual(err.exception.args, ('close',))
802 self.assertFalse(b.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000803
804 def test_multi_close(self):
805 raw = self.MockRawIO()
806 b = self.tp(raw)
807 b.close()
808 b.close()
809 b.close()
810 self.assertRaises(ValueError, b.flush)
811
Antoine Pitroufc9ead62010-12-21 21:26:55 +0000812 def test_readonly_attributes(self):
813 raw = self.MockRawIO()
814 buf = self.tp(raw)
815 x = self.MockRawIO()
816 with self.assertRaises((AttributeError, TypeError)):
817 buf.raw = x
818
Christian Heimes1a6387e2008-03-26 12:49:49 +0000819
Antoine Pitroubff5df02012-07-29 19:02:46 +0200820class SizeofTest:
821
822 @support.cpython_only
823 def test_sizeof(self):
824 bufsize1 = 4096
825 bufsize2 = 8192
826 rawio = self.MockRawIO()
827 bufio = self.tp(rawio, buffer_size=bufsize1)
828 size = sys.getsizeof(bufio) - bufsize1
829 rawio = self.MockRawIO()
830 bufio = self.tp(rawio, buffer_size=bufsize2)
831 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
832
833
Antoine Pitrou19690592009-06-12 20:14:08 +0000834class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
835 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000836
Antoine Pitrou19690592009-06-12 20:14:08 +0000837 def test_constructor(self):
838 rawio = self.MockRawIO([b"abc"])
839 bufio = self.tp(rawio)
840 bufio.__init__(rawio)
841 bufio.__init__(rawio, buffer_size=1024)
842 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000843 self.assertEqual(b"abc", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000844 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
845 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
846 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
847 rawio = self.MockRawIO([b"abc"])
848 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000849 self.assertEqual(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000850
Serhiy Storchaka1d19f972014-02-12 10:52:07 +0200851 def test_uninitialized(self):
852 bufio = self.tp.__new__(self.tp)
853 del bufio
854 bufio = self.tp.__new__(self.tp)
855 self.assertRaisesRegexp((ValueError, AttributeError),
856 'uninitialized|has no attribute',
857 bufio.read, 0)
858 bufio.__init__(self.MockRawIO())
859 self.assertEqual(bufio.read(0), b'')
860
Antoine Pitrou19690592009-06-12 20:14:08 +0000861 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000862 for arg in (None, 7):
863 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
864 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000865 self.assertEqual(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000866 # Invalid args
867 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000868
Antoine Pitrou19690592009-06-12 20:14:08 +0000869 def test_read1(self):
870 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
871 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000872 self.assertEqual(b"a", bufio.read(1))
873 self.assertEqual(b"b", bufio.read1(1))
874 self.assertEqual(rawio._reads, 1)
875 self.assertEqual(b"c", bufio.read1(100))
876 self.assertEqual(rawio._reads, 1)
877 self.assertEqual(b"d", bufio.read1(100))
878 self.assertEqual(rawio._reads, 2)
879 self.assertEqual(b"efg", bufio.read1(100))
880 self.assertEqual(rawio._reads, 3)
881 self.assertEqual(b"", bufio.read1(100))
882 self.assertEqual(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000883 # Invalid args
884 self.assertRaises(ValueError, bufio.read1, -1)
885
886 def test_readinto(self):
887 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
888 bufio = self.tp(rawio)
889 b = bytearray(2)
Ezio Melotti2623a372010-11-21 13:34:58 +0000890 self.assertEqual(bufio.readinto(b), 2)
891 self.assertEqual(b, b"ab")
892 self.assertEqual(bufio.readinto(b), 2)
893 self.assertEqual(b, b"cd")
894 self.assertEqual(bufio.readinto(b), 2)
895 self.assertEqual(b, b"ef")
896 self.assertEqual(bufio.readinto(b), 1)
897 self.assertEqual(b, b"gf")
898 self.assertEqual(bufio.readinto(b), 0)
899 self.assertEqual(b, b"gf")
Antoine Pitrou19690592009-06-12 20:14:08 +0000900
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000901 def test_readlines(self):
902 def bufio():
903 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
904 return self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000905 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
906 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
907 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000908
Antoine Pitrou19690592009-06-12 20:14:08 +0000909 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000910 data = b"abcdefghi"
911 dlen = len(data)
912
913 tests = [
914 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
915 [ 100, [ 3, 3, 3], [ dlen ] ],
916 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
917 ]
918
919 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000920 rawio = self.MockFileIO(data)
921 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000922 pos = 0
923 for nbytes in buf_read_sizes:
Ezio Melotti2623a372010-11-21 13:34:58 +0000924 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Christian Heimes1a6387e2008-03-26 12:49:49 +0000925 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000926 # this is mildly implementation-dependent
Ezio Melotti2623a372010-11-21 13:34:58 +0000927 self.assertEqual(rawio.read_history, raw_read_sizes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000928
Antoine Pitrou19690592009-06-12 20:14:08 +0000929 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000930 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000931 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
932 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000933 self.assertEqual(b"abcd", bufio.read(6))
934 self.assertEqual(b"e", bufio.read(1))
935 self.assertEqual(b"fg", bufio.read())
936 self.assertEqual(b"", bufio.peek(1))
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200937 self.assertIsNone(bufio.read())
Ezio Melotti2623a372010-11-21 13:34:58 +0000938 self.assertEqual(b"", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000939
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200940 rawio = self.MockRawIO((b"a", None, None))
941 self.assertEqual(b"a", rawio.readall())
942 self.assertIsNone(rawio.readall())
943
Antoine Pitrou19690592009-06-12 20:14:08 +0000944 def test_read_past_eof(self):
945 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
946 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000947
Ezio Melotti2623a372010-11-21 13:34:58 +0000948 self.assertEqual(b"abcdefg", bufio.read(9000))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000949
Antoine Pitrou19690592009-06-12 20:14:08 +0000950 def test_read_all(self):
951 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
952 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000953
Ezio Melotti2623a372010-11-21 13:34:58 +0000954 self.assertEqual(b"abcdefg", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000955
Victor Stinner6a102812010-04-27 23:55:59 +0000956 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +0000957 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +0000958 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000959 try:
960 # Write out many bytes with exactly the same number of 0's,
961 # 1's... 255's. This will help us check that concurrent reading
962 # doesn't duplicate or forget contents.
963 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000964 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000965 random.shuffle(l)
966 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000967 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000968 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000969 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000970 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000971 errors = []
972 results = []
973 def f():
974 try:
975 # Intra-buffer read then buffer-flushing read
976 for n in cycle([1, 19]):
977 s = bufio.read(n)
978 if not s:
979 break
980 # list.append() is atomic
981 results.append(s)
982 except Exception as e:
983 errors.append(e)
984 raise
985 threads = [threading.Thread(target=f) for x in range(20)]
986 for t in threads:
987 t.start()
988 time.sleep(0.02) # yield
989 for t in threads:
990 t.join()
991 self.assertFalse(errors,
992 "the following exceptions were caught: %r" % errors)
993 s = b''.join(results)
994 for i in range(256):
995 c = bytes(bytearray([i]))
996 self.assertEqual(s.count(c), N)
997 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000998 support.unlink(support.TESTFN)
999
1000 def test_misbehaved_io(self):
1001 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1002 bufio = self.tp(rawio)
1003 self.assertRaises(IOError, bufio.seek, 0)
1004 self.assertRaises(IOError, bufio.tell)
1005
Antoine Pitroucb4f47c2010-08-11 13:40:17 +00001006 def test_no_extraneous_read(self):
1007 # Issue #9550; when the raw IO object has satisfied the read request,
1008 # we should not issue any additional reads, otherwise it may block
1009 # (e.g. socket).
1010 bufsize = 16
1011 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1012 rawio = self.MockRawIO([b"x" * n])
1013 bufio = self.tp(rawio, bufsize)
1014 self.assertEqual(bufio.read(n), b"x" * n)
1015 # Simple case: one raw read is enough to satisfy the request.
1016 self.assertEqual(rawio._extraneous_reads, 0,
1017 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1018 # A more complex case where two raw reads are needed to satisfy
1019 # the request.
1020 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1021 bufio = self.tp(rawio, bufsize)
1022 self.assertEqual(bufio.read(n), b"x" * n)
1023 self.assertEqual(rawio._extraneous_reads, 0,
1024 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1025
1026
Antoine Pitroubff5df02012-07-29 19:02:46 +02001027class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001028 tp = io.BufferedReader
1029
1030 def test_constructor(self):
1031 BufferedReaderTest.test_constructor(self)
1032 # The allocation can succeed on 32-bit builds, e.g. with more
1033 # than 2GB RAM and a 64-bit kernel.
1034 if sys.maxsize > 0x7FFFFFFF:
1035 rawio = self.MockRawIO()
1036 bufio = self.tp(rawio)
1037 self.assertRaises((OverflowError, MemoryError, ValueError),
1038 bufio.__init__, rawio, sys.maxsize)
1039
1040 def test_initialization(self):
1041 rawio = self.MockRawIO([b"abc"])
1042 bufio = self.tp(rawio)
1043 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1044 self.assertRaises(ValueError, bufio.read)
1045 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1046 self.assertRaises(ValueError, bufio.read)
1047 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1048 self.assertRaises(ValueError, bufio.read)
1049
1050 def test_misbehaved_io_read(self):
1051 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1052 bufio = self.tp(rawio)
1053 # _pyio.BufferedReader seems to implement reading different, so that
1054 # checking this is not so easy.
1055 self.assertRaises(IOError, bufio.read, 10)
1056
1057 def test_garbage_collection(self):
1058 # C BufferedReader objects are collected.
1059 # The Python version has __del__, so it ends into gc.garbage instead
1060 rawio = self.FileIO(support.TESTFN, "w+b")
1061 f = self.tp(rawio)
1062 f.f = f
1063 wr = weakref.ref(f)
1064 del f
1065 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001066 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00001067
R David Murray5b2cf5e2013-02-23 22:11:21 -05001068 def test_args_error(self):
1069 # Issue #17275
1070 with self.assertRaisesRegexp(TypeError, "BufferedReader"):
1071 self.tp(io.BytesIO(), 1024, 1024, 1024)
1072
1073
Antoine Pitrou19690592009-06-12 20:14:08 +00001074class PyBufferedReaderTest(BufferedReaderTest):
1075 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001076
1077
Antoine Pitrou19690592009-06-12 20:14:08 +00001078class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1079 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001080
Antoine Pitrou19690592009-06-12 20:14:08 +00001081 def test_constructor(self):
1082 rawio = self.MockRawIO()
1083 bufio = self.tp(rawio)
1084 bufio.__init__(rawio)
1085 bufio.__init__(rawio, buffer_size=1024)
1086 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +00001087 self.assertEqual(3, bufio.write(b"abc"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001088 bufio.flush()
1089 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1090 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1091 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1092 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +00001093 self.assertEqual(3, bufio.write(b"ghi"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001094 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001095 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001096
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001097 def test_uninitialized(self):
1098 bufio = self.tp.__new__(self.tp)
1099 del bufio
1100 bufio = self.tp.__new__(self.tp)
1101 self.assertRaisesRegexp((ValueError, AttributeError),
1102 'uninitialized|has no attribute',
1103 bufio.write, b'')
1104 bufio.__init__(self.MockRawIO())
1105 self.assertEqual(bufio.write(b''), 0)
1106
Antoine Pitrou19690592009-06-12 20:14:08 +00001107 def test_detach_flush(self):
1108 raw = self.MockRawIO()
1109 buf = self.tp(raw)
1110 buf.write(b"howdy!")
1111 self.assertFalse(raw._write_stack)
1112 buf.detach()
1113 self.assertEqual(raw._write_stack, [b"howdy!"])
1114
1115 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001116 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +00001117 writer = self.MockRawIO()
1118 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001119 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001120 self.assertFalse(writer._write_stack)
1121
Antoine Pitrou19690592009-06-12 20:14:08 +00001122 def test_write_overflow(self):
1123 writer = self.MockRawIO()
1124 bufio = self.tp(writer, 8)
1125 contents = b"abcdefghijklmnop"
1126 for n in range(0, len(contents), 3):
1127 bufio.write(contents[n:n+3])
1128 flushed = b"".join(writer._write_stack)
1129 # At least (total - 8) bytes were implicitly flushed, perhaps more
1130 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001131 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001132
Antoine Pitrou19690592009-06-12 20:14:08 +00001133 def check_writes(self, intermediate_func):
1134 # Lots of writes, test the flushed output is as expected.
1135 contents = bytes(range(256)) * 1000
1136 n = 0
1137 writer = self.MockRawIO()
1138 bufio = self.tp(writer, 13)
1139 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1140 def gen_sizes():
1141 for size in count(1):
1142 for i in range(15):
1143 yield size
1144 sizes = gen_sizes()
1145 while n < len(contents):
1146 size = min(next(sizes), len(contents) - n)
Ezio Melotti2623a372010-11-21 13:34:58 +00001147 self.assertEqual(bufio.write(contents[n:n+size]), size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001148 intermediate_func(bufio)
1149 n += size
1150 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001151 self.assertEqual(contents,
Antoine Pitrou19690592009-06-12 20:14:08 +00001152 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001153
Antoine Pitrou19690592009-06-12 20:14:08 +00001154 def test_writes(self):
1155 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001156
Antoine Pitrou19690592009-06-12 20:14:08 +00001157 def test_writes_and_flushes(self):
1158 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001159
Antoine Pitrou19690592009-06-12 20:14:08 +00001160 def test_writes_and_seeks(self):
1161 def _seekabs(bufio):
1162 pos = bufio.tell()
1163 bufio.seek(pos + 1, 0)
1164 bufio.seek(pos - 1, 0)
1165 bufio.seek(pos, 0)
1166 self.check_writes(_seekabs)
1167 def _seekrel(bufio):
1168 pos = bufio.seek(0, 1)
1169 bufio.seek(+1, 1)
1170 bufio.seek(-1, 1)
1171 bufio.seek(pos, 0)
1172 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001173
Antoine Pitrou19690592009-06-12 20:14:08 +00001174 def test_writes_and_truncates(self):
1175 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001176
Antoine Pitrou19690592009-06-12 20:14:08 +00001177 def test_write_non_blocking(self):
1178 raw = self.MockNonBlockWriterIO()
1179 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001180
Ezio Melotti2623a372010-11-21 13:34:58 +00001181 self.assertEqual(bufio.write(b"abcd"), 4)
1182 self.assertEqual(bufio.write(b"efghi"), 5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001183 # 1 byte will be written, the rest will be buffered
1184 raw.block_on(b"k")
Ezio Melotti2623a372010-11-21 13:34:58 +00001185 self.assertEqual(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001186
Antoine Pitrou19690592009-06-12 20:14:08 +00001187 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1188 raw.block_on(b"0")
1189 try:
1190 bufio.write(b"opqrwxyz0123456789")
1191 except self.BlockingIOError as e:
1192 written = e.characters_written
1193 else:
1194 self.fail("BlockingIOError should have been raised")
Ezio Melotti2623a372010-11-21 13:34:58 +00001195 self.assertEqual(written, 16)
1196 self.assertEqual(raw.pop_written(),
Antoine Pitrou19690592009-06-12 20:14:08 +00001197 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001198
Ezio Melotti2623a372010-11-21 13:34:58 +00001199 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Antoine Pitrou19690592009-06-12 20:14:08 +00001200 s = raw.pop_written()
1201 # Previously buffered bytes were flushed
1202 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001203
Antoine Pitrou19690592009-06-12 20:14:08 +00001204 def test_write_and_rewind(self):
1205 raw = io.BytesIO()
1206 bufio = self.tp(raw, 4)
1207 self.assertEqual(bufio.write(b"abcdef"), 6)
1208 self.assertEqual(bufio.tell(), 6)
1209 bufio.seek(0, 0)
1210 self.assertEqual(bufio.write(b"XY"), 2)
1211 bufio.seek(6, 0)
1212 self.assertEqual(raw.getvalue(), b"XYcdef")
1213 self.assertEqual(bufio.write(b"123456"), 6)
1214 bufio.flush()
1215 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001216
Antoine Pitrou19690592009-06-12 20:14:08 +00001217 def test_flush(self):
1218 writer = self.MockRawIO()
1219 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001220 bufio.write(b"abc")
1221 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001222 self.assertEqual(b"abc", writer._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001223
Antoine Pitrou78e761e2012-10-16 22:57:11 +02001224 def test_writelines(self):
1225 l = [b'ab', b'cd', b'ef']
1226 writer = self.MockRawIO()
1227 bufio = self.tp(writer, 8)
1228 bufio.writelines(l)
1229 bufio.flush()
1230 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1231
1232 def test_writelines_userlist(self):
1233 l = UserList([b'ab', b'cd', b'ef'])
1234 writer = self.MockRawIO()
1235 bufio = self.tp(writer, 8)
1236 bufio.writelines(l)
1237 bufio.flush()
1238 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1239
1240 def test_writelines_error(self):
1241 writer = self.MockRawIO()
1242 bufio = self.tp(writer, 8)
1243 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1244 self.assertRaises(TypeError, bufio.writelines, None)
1245
Antoine Pitrou19690592009-06-12 20:14:08 +00001246 def test_destructor(self):
1247 writer = self.MockRawIO()
1248 bufio = self.tp(writer, 8)
1249 bufio.write(b"abc")
1250 del bufio
1251 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001252 self.assertEqual(b"abc", writer._write_stack[0])
Antoine Pitrou19690592009-06-12 20:14:08 +00001253
1254 def test_truncate(self):
1255 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001256 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001257 bufio = self.tp(raw, 8)
1258 bufio.write(b"abcdef")
1259 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001260 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001261 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001262 self.assertEqual(f.read(), b"abc")
1263
Victor Stinner6a102812010-04-27 23:55:59 +00001264 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001265 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001266 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001267 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001268 # Write out many bytes from many threads and test they were
1269 # all flushed.
1270 N = 1000
1271 contents = bytes(range(256)) * N
1272 sizes = cycle([1, 19])
1273 n = 0
1274 queue = deque()
1275 while n < len(contents):
1276 size = next(sizes)
1277 queue.append(contents[n:n+size])
1278 n += size
1279 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001280 # We use a real file object because it allows us to
1281 # exercise situations where the GIL is released before
1282 # writing the buffer to the raw streams. This is in addition
1283 # to concurrency issues due to switching threads in the middle
1284 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001285 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001286 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001287 errors = []
1288 def f():
1289 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001290 while True:
1291 try:
1292 s = queue.popleft()
1293 except IndexError:
1294 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001295 bufio.write(s)
1296 except Exception as e:
1297 errors.append(e)
1298 raise
1299 threads = [threading.Thread(target=f) for x in range(20)]
1300 for t in threads:
1301 t.start()
1302 time.sleep(0.02) # yield
1303 for t in threads:
1304 t.join()
1305 self.assertFalse(errors,
1306 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001307 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001308 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001309 s = f.read()
1310 for i in range(256):
Ezio Melotti2623a372010-11-21 13:34:58 +00001311 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001312 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001313 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001314
Antoine Pitrou19690592009-06-12 20:14:08 +00001315 def test_misbehaved_io(self):
1316 rawio = self.MisbehavedRawIO()
1317 bufio = self.tp(rawio, 5)
1318 self.assertRaises(IOError, bufio.seek, 0)
1319 self.assertRaises(IOError, bufio.tell)
1320 self.assertRaises(IOError, bufio.write, b"abcdef")
1321
1322 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001323 with support.check_warnings(("max_buffer_size is deprecated",
1324 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001325 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001326
Benjamin Petersona2d6d712012-12-20 12:24:10 -06001327 def test_write_error_on_close(self):
1328 raw = self.MockRawIO()
1329 def bad_write(b):
1330 raise IOError()
1331 raw.write = bad_write
1332 b = self.tp(raw)
1333 b.write(b'spam')
1334 self.assertRaises(IOError, b.close) # exception not swallowed
1335 self.assertTrue(b.closed)
1336
Antoine Pitrou19690592009-06-12 20:14:08 +00001337
Antoine Pitroubff5df02012-07-29 19:02:46 +02001338class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001339 tp = io.BufferedWriter
1340
1341 def test_constructor(self):
1342 BufferedWriterTest.test_constructor(self)
1343 # The allocation can succeed on 32-bit builds, e.g. with more
1344 # than 2GB RAM and a 64-bit kernel.
1345 if sys.maxsize > 0x7FFFFFFF:
1346 rawio = self.MockRawIO()
1347 bufio = self.tp(rawio)
1348 self.assertRaises((OverflowError, MemoryError, ValueError),
1349 bufio.__init__, rawio, sys.maxsize)
1350
1351 def test_initialization(self):
1352 rawio = self.MockRawIO()
1353 bufio = self.tp(rawio)
1354 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1355 self.assertRaises(ValueError, bufio.write, b"def")
1356 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1357 self.assertRaises(ValueError, bufio.write, b"def")
1358 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1359 self.assertRaises(ValueError, bufio.write, b"def")
1360
1361 def test_garbage_collection(self):
1362 # C BufferedWriter objects are collected, and collecting them flushes
1363 # all data to disk.
1364 # The Python version has __del__, so it ends into gc.garbage instead
1365 rawio = self.FileIO(support.TESTFN, "w+b")
1366 f = self.tp(rawio)
1367 f.write(b"123xxx")
1368 f.x = f
1369 wr = weakref.ref(f)
1370 del f
1371 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001372 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001373 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001374 self.assertEqual(f.read(), b"123xxx")
1375
R David Murray5b2cf5e2013-02-23 22:11:21 -05001376 def test_args_error(self):
1377 # Issue #17275
1378 with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
1379 self.tp(io.BytesIO(), 1024, 1024, 1024)
1380
Antoine Pitrou19690592009-06-12 20:14:08 +00001381
1382class PyBufferedWriterTest(BufferedWriterTest):
1383 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001384
1385class BufferedRWPairTest(unittest.TestCase):
1386
Antoine Pitrou19690592009-06-12 20:14:08 +00001387 def test_constructor(self):
1388 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001389 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001390
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001391 def test_uninitialized(self):
1392 pair = self.tp.__new__(self.tp)
1393 del pair
1394 pair = self.tp.__new__(self.tp)
1395 self.assertRaisesRegexp((ValueError, AttributeError),
1396 'uninitialized|has no attribute',
1397 pair.read, 0)
1398 self.assertRaisesRegexp((ValueError, AttributeError),
1399 'uninitialized|has no attribute',
1400 pair.write, b'')
1401 pair.__init__(self.MockRawIO(), self.MockRawIO())
1402 self.assertEqual(pair.read(0), b'')
1403 self.assertEqual(pair.write(b''), 0)
1404
Antoine Pitrou19690592009-06-12 20:14:08 +00001405 def test_detach(self):
1406 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1407 self.assertRaises(self.UnsupportedOperation, pair.detach)
1408
1409 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001410 with support.check_warnings(("max_buffer_size is deprecated",
1411 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001412 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001413
1414 def test_constructor_with_not_readable(self):
1415 class NotReadable(MockRawIO):
1416 def readable(self):
1417 return False
1418
1419 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1420
1421 def test_constructor_with_not_writeable(self):
1422 class NotWriteable(MockRawIO):
1423 def writable(self):
1424 return False
1425
1426 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1427
1428 def test_read(self):
1429 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1430
1431 self.assertEqual(pair.read(3), b"abc")
1432 self.assertEqual(pair.read(1), b"d")
1433 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001434 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1435 self.assertEqual(pair.read(None), b"abc")
1436
1437 def test_readlines(self):
1438 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1439 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1440 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1441 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001442
1443 def test_read1(self):
1444 # .read1() is delegated to the underlying reader object, so this test
1445 # can be shallow.
1446 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1447
1448 self.assertEqual(pair.read1(3), b"abc")
1449
1450 def test_readinto(self):
1451 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1452
1453 data = bytearray(5)
1454 self.assertEqual(pair.readinto(data), 5)
1455 self.assertEqual(data, b"abcde")
1456
1457 def test_write(self):
1458 w = self.MockRawIO()
1459 pair = self.tp(self.MockRawIO(), w)
1460
1461 pair.write(b"abc")
1462 pair.flush()
1463 pair.write(b"def")
1464 pair.flush()
1465 self.assertEqual(w._write_stack, [b"abc", b"def"])
1466
1467 def test_peek(self):
1468 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1469
1470 self.assertTrue(pair.peek(3).startswith(b"abc"))
1471 self.assertEqual(pair.read(3), b"abc")
1472
1473 def test_readable(self):
1474 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1475 self.assertTrue(pair.readable())
1476
1477 def test_writeable(self):
1478 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1479 self.assertTrue(pair.writable())
1480
1481 def test_seekable(self):
1482 # BufferedRWPairs are never seekable, even if their readers and writers
1483 # are.
1484 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1485 self.assertFalse(pair.seekable())
1486
1487 # .flush() is delegated to the underlying writer object and has been
1488 # tested in the test_write method.
1489
1490 def test_close_and_closed(self):
1491 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1492 self.assertFalse(pair.closed)
1493 pair.close()
1494 self.assertTrue(pair.closed)
1495
1496 def test_isatty(self):
1497 class SelectableIsAtty(MockRawIO):
1498 def __init__(self, isatty):
1499 MockRawIO.__init__(self)
1500 self._isatty = isatty
1501
1502 def isatty(self):
1503 return self._isatty
1504
1505 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1506 self.assertFalse(pair.isatty())
1507
1508 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1509 self.assertTrue(pair.isatty())
1510
1511 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1512 self.assertTrue(pair.isatty())
1513
1514 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1515 self.assertTrue(pair.isatty())
1516
Benjamin Peterson1c873bf2014-09-29 22:46:57 -04001517 def test_weakref_clearing(self):
1518 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1519 ref = weakref.ref(brw)
1520 brw = None
1521 ref = None # Shouldn't segfault.
1522
Antoine Pitrou19690592009-06-12 20:14:08 +00001523class CBufferedRWPairTest(BufferedRWPairTest):
1524 tp = io.BufferedRWPair
1525
1526class PyBufferedRWPairTest(BufferedRWPairTest):
1527 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001528
1529
Antoine Pitrou19690592009-06-12 20:14:08 +00001530class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1531 read_mode = "rb+"
1532 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001533
Antoine Pitrou19690592009-06-12 20:14:08 +00001534 def test_constructor(self):
1535 BufferedReaderTest.test_constructor(self)
1536 BufferedWriterTest.test_constructor(self)
1537
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001538 def test_uninitialized(self):
1539 BufferedReaderTest.test_uninitialized(self)
1540 BufferedWriterTest.test_uninitialized(self)
1541
Antoine Pitrou19690592009-06-12 20:14:08 +00001542 def test_read_and_write(self):
1543 raw = self.MockRawIO((b"asdf", b"ghjk"))
1544 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001545
1546 self.assertEqual(b"as", rw.read(2))
1547 rw.write(b"ddd")
1548 rw.write(b"eee")
1549 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001550 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001551 self.assertEqual(b"dddeee", raw._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001552
Antoine Pitrou19690592009-06-12 20:14:08 +00001553 def test_seek_and_tell(self):
1554 raw = self.BytesIO(b"asdfghjkl")
1555 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001556
Ezio Melotti2623a372010-11-21 13:34:58 +00001557 self.assertEqual(b"as", rw.read(2))
1558 self.assertEqual(2, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001559 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001560 self.assertEqual(b"asdf", rw.read(4))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001561
Antoine Pitrou808cec52011-08-20 15:40:58 +02001562 rw.write(b"123f")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001563 rw.seek(0, 0)
Antoine Pitrou808cec52011-08-20 15:40:58 +02001564 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001565 self.assertEqual(9, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001566 rw.seek(-4, 2)
Ezio Melotti2623a372010-11-21 13:34:58 +00001567 self.assertEqual(5, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001568 rw.seek(2, 1)
Ezio Melotti2623a372010-11-21 13:34:58 +00001569 self.assertEqual(7, rw.tell())
1570 self.assertEqual(b"fl", rw.read(11))
Antoine Pitrou808cec52011-08-20 15:40:58 +02001571 rw.flush()
1572 self.assertEqual(b"asdf123fl", raw.getvalue())
1573
Christian Heimes1a6387e2008-03-26 12:49:49 +00001574 self.assertRaises(TypeError, rw.seek, 0.0)
1575
Antoine Pitrou19690592009-06-12 20:14:08 +00001576 def check_flush_and_read(self, read_func):
1577 raw = self.BytesIO(b"abcdefghi")
1578 bufio = self.tp(raw)
1579
Ezio Melotti2623a372010-11-21 13:34:58 +00001580 self.assertEqual(b"ab", read_func(bufio, 2))
Antoine Pitrou19690592009-06-12 20:14:08 +00001581 bufio.write(b"12")
Ezio Melotti2623a372010-11-21 13:34:58 +00001582 self.assertEqual(b"ef", read_func(bufio, 2))
1583 self.assertEqual(6, bufio.tell())
Antoine Pitrou19690592009-06-12 20:14:08 +00001584 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001585 self.assertEqual(6, bufio.tell())
1586 self.assertEqual(b"ghi", read_func(bufio))
Antoine Pitrou19690592009-06-12 20:14:08 +00001587 raw.seek(0, 0)
1588 raw.write(b"XYZ")
1589 # flush() resets the read buffer
1590 bufio.flush()
1591 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001592 self.assertEqual(b"XYZ", read_func(bufio, 3))
Antoine Pitrou19690592009-06-12 20:14:08 +00001593
1594 def test_flush_and_read(self):
1595 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1596
1597 def test_flush_and_readinto(self):
1598 def _readinto(bufio, n=-1):
1599 b = bytearray(n if n >= 0 else 9999)
1600 n = bufio.readinto(b)
1601 return bytes(b[:n])
1602 self.check_flush_and_read(_readinto)
1603
1604 def test_flush_and_peek(self):
1605 def _peek(bufio, n=-1):
1606 # This relies on the fact that the buffer can contain the whole
1607 # raw stream, otherwise peek() can return less.
1608 b = bufio.peek(n)
1609 if n != -1:
1610 b = b[:n]
1611 bufio.seek(len(b), 1)
1612 return b
1613 self.check_flush_and_read(_peek)
1614
1615 def test_flush_and_write(self):
1616 raw = self.BytesIO(b"abcdefghi")
1617 bufio = self.tp(raw)
1618
1619 bufio.write(b"123")
1620 bufio.flush()
1621 bufio.write(b"45")
1622 bufio.flush()
1623 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001624 self.assertEqual(b"12345fghi", raw.getvalue())
1625 self.assertEqual(b"12345fghi", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +00001626
1627 def test_threads(self):
1628 BufferedReaderTest.test_threads(self)
1629 BufferedWriterTest.test_threads(self)
1630
1631 def test_writes_and_peek(self):
1632 def _peek(bufio):
1633 bufio.peek(1)
1634 self.check_writes(_peek)
1635 def _peek(bufio):
1636 pos = bufio.tell()
1637 bufio.seek(-1, 1)
1638 bufio.peek(1)
1639 bufio.seek(pos, 0)
1640 self.check_writes(_peek)
1641
1642 def test_writes_and_reads(self):
1643 def _read(bufio):
1644 bufio.seek(-1, 1)
1645 bufio.read(1)
1646 self.check_writes(_read)
1647
1648 def test_writes_and_read1s(self):
1649 def _read1(bufio):
1650 bufio.seek(-1, 1)
1651 bufio.read1(1)
1652 self.check_writes(_read1)
1653
1654 def test_writes_and_readintos(self):
1655 def _read(bufio):
1656 bufio.seek(-1, 1)
1657 bufio.readinto(bytearray(1))
1658 self.check_writes(_read)
1659
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001660 def test_write_after_readahead(self):
1661 # Issue #6629: writing after the buffer was filled by readahead should
1662 # first rewind the raw stream.
1663 for overwrite_size in [1, 5]:
1664 raw = self.BytesIO(b"A" * 10)
1665 bufio = self.tp(raw, 4)
1666 # Trigger readahead
1667 self.assertEqual(bufio.read(1), b"A")
1668 self.assertEqual(bufio.tell(), 1)
1669 # Overwriting should rewind the raw stream if it needs so
1670 bufio.write(b"B" * overwrite_size)
1671 self.assertEqual(bufio.tell(), overwrite_size + 1)
1672 # If the write size was smaller than the buffer size, flush() and
1673 # check that rewind happens.
1674 bufio.flush()
1675 self.assertEqual(bufio.tell(), overwrite_size + 1)
1676 s = raw.getvalue()
1677 self.assertEqual(s,
1678 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1679
Antoine Pitrouee46a7b2011-05-13 00:31:52 +02001680 def test_write_rewind_write(self):
1681 # Various combinations of reading / writing / seeking backwards / writing again
1682 def mutate(bufio, pos1, pos2):
1683 assert pos2 >= pos1
1684 # Fill the buffer
1685 bufio.seek(pos1)
1686 bufio.read(pos2 - pos1)
1687 bufio.write(b'\x02')
1688 # This writes earlier than the previous write, but still inside
1689 # the buffer.
1690 bufio.seek(pos1)
1691 bufio.write(b'\x01')
1692
1693 b = b"\x80\x81\x82\x83\x84"
1694 for i in range(0, len(b)):
1695 for j in range(i, len(b)):
1696 raw = self.BytesIO(b)
1697 bufio = self.tp(raw, 100)
1698 mutate(bufio, i, j)
1699 bufio.flush()
1700 expected = bytearray(b)
1701 expected[j] = 2
1702 expected[i] = 1
1703 self.assertEqual(raw.getvalue(), expected,
1704 "failed result for i=%d, j=%d" % (i, j))
1705
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001706 def test_truncate_after_read_or_write(self):
1707 raw = self.BytesIO(b"A" * 10)
1708 bufio = self.tp(raw, 100)
1709 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1710 self.assertEqual(bufio.truncate(), 2)
1711 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1712 self.assertEqual(bufio.truncate(), 4)
1713
Antoine Pitrou19690592009-06-12 20:14:08 +00001714 def test_misbehaved_io(self):
1715 BufferedReaderTest.test_misbehaved_io(self)
1716 BufferedWriterTest.test_misbehaved_io(self)
1717
Antoine Pitrou808cec52011-08-20 15:40:58 +02001718 def test_interleaved_read_write(self):
1719 # Test for issue #12213
1720 with self.BytesIO(b'abcdefgh') as raw:
1721 with self.tp(raw, 100) as f:
1722 f.write(b"1")
1723 self.assertEqual(f.read(1), b'b')
1724 f.write(b'2')
1725 self.assertEqual(f.read1(1), b'd')
1726 f.write(b'3')
1727 buf = bytearray(1)
1728 f.readinto(buf)
1729 self.assertEqual(buf, b'f')
1730 f.write(b'4')
1731 self.assertEqual(f.peek(1), b'h')
1732 f.flush()
1733 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1734
1735 with self.BytesIO(b'abc') as raw:
1736 with self.tp(raw, 100) as f:
1737 self.assertEqual(f.read(1), b'a')
1738 f.write(b"2")
1739 self.assertEqual(f.read(1), b'c')
1740 f.flush()
1741 self.assertEqual(raw.getvalue(), b'a2c')
1742
1743 def test_interleaved_readline_write(self):
1744 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1745 with self.tp(raw) as f:
1746 f.write(b'1')
1747 self.assertEqual(f.readline(), b'b\n')
1748 f.write(b'2')
1749 self.assertEqual(f.readline(), b'def\n')
1750 f.write(b'3')
1751 self.assertEqual(f.readline(), b'\n')
1752 f.flush()
1753 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1754
R David Murray5b2cf5e2013-02-23 22:11:21 -05001755
Antoine Pitroubff5df02012-07-29 19:02:46 +02001756class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
1757 BufferedRandomTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001758 tp = io.BufferedRandom
1759
1760 def test_constructor(self):
1761 BufferedRandomTest.test_constructor(self)
1762 # The allocation can succeed on 32-bit builds, e.g. with more
1763 # than 2GB RAM and a 64-bit kernel.
1764 if sys.maxsize > 0x7FFFFFFF:
1765 rawio = self.MockRawIO()
1766 bufio = self.tp(rawio)
1767 self.assertRaises((OverflowError, MemoryError, ValueError),
1768 bufio.__init__, rawio, sys.maxsize)
1769
1770 def test_garbage_collection(self):
1771 CBufferedReaderTest.test_garbage_collection(self)
1772 CBufferedWriterTest.test_garbage_collection(self)
1773
R David Murray5b2cf5e2013-02-23 22:11:21 -05001774 def test_args_error(self):
1775 # Issue #17275
1776 with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
1777 self.tp(io.BytesIO(), 1024, 1024, 1024)
1778
1779
Antoine Pitrou19690592009-06-12 20:14:08 +00001780class PyBufferedRandomTest(BufferedRandomTest):
1781 tp = pyio.BufferedRandom
1782
1783
Christian Heimes1a6387e2008-03-26 12:49:49 +00001784# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1785# properties:
1786# - A single output character can correspond to many bytes of input.
1787# - The number of input bytes to complete the character can be
1788# undetermined until the last input byte is received.
1789# - The number of input bytes can vary depending on previous input.
1790# - A single input byte can correspond to many characters of output.
1791# - The number of output characters can be undetermined until the
1792# last input byte is received.
1793# - The number of output characters can vary depending on previous input.
1794
1795class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1796 """
1797 For testing seek/tell behavior with a stateful, buffering decoder.
1798
1799 Input is a sequence of words. Words may be fixed-length (length set
1800 by input) or variable-length (period-terminated). In variable-length
1801 mode, extra periods are ignored. Possible words are:
1802 - 'i' followed by a number sets the input length, I (maximum 99).
1803 When I is set to 0, words are space-terminated.
1804 - 'o' followed by a number sets the output length, O (maximum 99).
1805 - Any other word is converted into a word followed by a period on
1806 the output. The output word consists of the input word truncated
1807 or padded out with hyphens to make its length equal to O. If O
1808 is 0, the word is output verbatim without truncating or padding.
1809 I and O are initially set to 1. When I changes, any buffered input is
1810 re-scanned according to the new I. EOF also terminates the last word.
1811 """
1812
1813 def __init__(self, errors='strict'):
1814 codecs.IncrementalDecoder.__init__(self, errors)
1815 self.reset()
1816
1817 def __repr__(self):
1818 return '<SID %x>' % id(self)
1819
1820 def reset(self):
1821 self.i = 1
1822 self.o = 1
1823 self.buffer = bytearray()
1824
1825 def getstate(self):
1826 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1827 return bytes(self.buffer), i*100 + o
1828
1829 def setstate(self, state):
1830 buffer, io = state
1831 self.buffer = bytearray(buffer)
1832 i, o = divmod(io, 100)
1833 self.i, self.o = i ^ 1, o ^ 1
1834
1835 def decode(self, input, final=False):
1836 output = ''
1837 for b in input:
1838 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001839 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001840 if self.buffer:
1841 output += self.process_word()
1842 else:
1843 self.buffer.append(b)
1844 else: # fixed-length, terminate after self.i bytes
1845 self.buffer.append(b)
1846 if len(self.buffer) == self.i:
1847 output += self.process_word()
1848 if final and self.buffer: # EOF terminates the last word
1849 output += self.process_word()
1850 return output
1851
1852 def process_word(self):
1853 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001854 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001855 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001856 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001857 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1858 else:
1859 output = self.buffer.decode('ascii')
1860 if len(output) < self.o:
1861 output += '-'*self.o # pad out with hyphens
1862 if self.o:
1863 output = output[:self.o] # truncate to output length
1864 output += '.'
1865 self.buffer = bytearray()
1866 return output
1867
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001868 codecEnabled = False
1869
1870 @classmethod
1871 def lookupTestDecoder(cls, name):
1872 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001873 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001874 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001875 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001876 incrementalencoder=None,
1877 streamreader=None, streamwriter=None,
1878 incrementaldecoder=cls)
1879
1880# Register the previous decoder for testing.
1881# Disabled by default, tests will enable it.
1882codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1883
1884
Christian Heimes1a6387e2008-03-26 12:49:49 +00001885class StatefulIncrementalDecoderTest(unittest.TestCase):
1886 """
1887 Make sure the StatefulIncrementalDecoder actually works.
1888 """
1889
1890 test_cases = [
1891 # I=1, O=1 (fixed-length input == fixed-length output)
1892 (b'abcd', False, 'a.b.c.d.'),
1893 # I=0, O=0 (variable-length input, variable-length output)
1894 (b'oiabcd', True, 'abcd.'),
1895 # I=0, O=0 (should ignore extra periods)
1896 (b'oi...abcd...', True, 'abcd.'),
1897 # I=0, O=6 (variable-length input, fixed-length output)
1898 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1899 # I=2, O=6 (fixed-length input < fixed-length output)
1900 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1901 # I=6, O=3 (fixed-length input > fixed-length output)
1902 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1903 # I=0, then 3; O=29, then 15 (with longer output)
1904 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1905 'a----------------------------.' +
1906 'b----------------------------.' +
1907 'cde--------------------------.' +
1908 'abcdefghijabcde.' +
1909 'a.b------------.' +
1910 '.c.------------.' +
1911 'd.e------------.' +
1912 'k--------------.' +
1913 'l--------------.' +
1914 'm--------------.')
1915 ]
1916
Antoine Pitrou19690592009-06-12 20:14:08 +00001917 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001918 # Try a few one-shot test cases.
1919 for input, eof, output in self.test_cases:
1920 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001921 self.assertEqual(d.decode(input, eof), output)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001922
1923 # Also test an unfinished decode, followed by forcing EOF.
1924 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001925 self.assertEqual(d.decode(b'oiabcd'), '')
1926 self.assertEqual(d.decode(b'', 1), 'abcd.')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001927
1928class TextIOWrapperTest(unittest.TestCase):
1929
1930 def setUp(self):
1931 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1932 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001933 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001934
1935 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001936 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001937
Antoine Pitrou19690592009-06-12 20:14:08 +00001938 def test_constructor(self):
1939 r = self.BytesIO(b"\xc3\xa9\n\n")
1940 b = self.BufferedReader(r, 1000)
1941 t = self.TextIOWrapper(b)
1942 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti2623a372010-11-21 13:34:58 +00001943 self.assertEqual(t.encoding, "latin1")
1944 self.assertEqual(t.line_buffering, False)
Antoine Pitrou19690592009-06-12 20:14:08 +00001945 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti2623a372010-11-21 13:34:58 +00001946 self.assertEqual(t.encoding, "utf8")
1947 self.assertEqual(t.line_buffering, True)
1948 self.assertEqual("\xe9\n", t.readline())
Antoine Pitrou19690592009-06-12 20:14:08 +00001949 self.assertRaises(TypeError, t.__init__, b, newline=42)
1950 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1951
1952 def test_detach(self):
1953 r = self.BytesIO()
1954 b = self.BufferedWriter(r)
1955 t = self.TextIOWrapper(b)
1956 self.assertIs(t.detach(), b)
1957
1958 t = self.TextIOWrapper(b, encoding="ascii")
1959 t.write("howdy")
1960 self.assertFalse(r.getvalue())
1961 t.detach()
1962 self.assertEqual(r.getvalue(), b"howdy")
1963 self.assertRaises(ValueError, t.detach)
1964
Benjamin Peterson53ae6142014-12-21 20:51:50 -06001965 # Operations independent of the detached stream should still work
1966 repr(t)
1967 self.assertEqual(t.encoding, "ascii")
1968 self.assertEqual(t.errors, "strict")
1969 self.assertFalse(t.line_buffering)
1970
Antoine Pitrou19690592009-06-12 20:14:08 +00001971 def test_repr(self):
1972 raw = self.BytesIO("hello".encode("utf-8"))
1973 b = self.BufferedReader(raw)
1974 t = self.TextIOWrapper(b, encoding="utf-8")
1975 modname = self.TextIOWrapper.__module__
1976 self.assertEqual(repr(t),
1977 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1978 raw.name = "dummy"
1979 self.assertEqual(repr(t),
1980 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1981 raw.name = b"dummy"
1982 self.assertEqual(repr(t),
1983 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1984
Benjamin Peterson53ae6142014-12-21 20:51:50 -06001985 t.buffer.detach()
1986 repr(t) # Should not raise an exception
1987
Antoine Pitrou19690592009-06-12 20:14:08 +00001988 def test_line_buffering(self):
1989 r = self.BytesIO()
1990 b = self.BufferedWriter(r, 1000)
1991 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1992 t.write("X")
Ezio Melotti2623a372010-11-21 13:34:58 +00001993 self.assertEqual(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001994 t.write("Y\nZ")
Ezio Melotti2623a372010-11-21 13:34:58 +00001995 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001996 t.write("A\rB")
Ezio Melotti2623a372010-11-21 13:34:58 +00001997 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001998
Antoine Pitrou19690592009-06-12 20:14:08 +00001999 def test_encoding(self):
2000 # Check the encoding attribute is always set, and valid
2001 b = self.BytesIO()
2002 t = self.TextIOWrapper(b, encoding="utf8")
2003 self.assertEqual(t.encoding, "utf8")
2004 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002005 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002006 codecs.lookup(t.encoding)
2007
2008 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002009 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00002010 b = self.BytesIO(b"abc\n\xff\n")
2011 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002012 self.assertRaises(UnicodeError, t.read)
2013 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00002014 b = self.BytesIO(b"abc\n\xff\n")
2015 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002016 self.assertRaises(UnicodeError, t.read)
2017 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00002018 b = self.BytesIO(b"abc\n\xff\n")
2019 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti2623a372010-11-21 13:34:58 +00002020 self.assertEqual(t.read(), "abc\n\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002021 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00002022 b = self.BytesIO(b"abc\n\xff\n")
2023 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti2623a372010-11-21 13:34:58 +00002024 self.assertEqual(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002025
Antoine Pitrou19690592009-06-12 20:14:08 +00002026 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002027 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00002028 b = self.BytesIO()
2029 t = self.TextIOWrapper(b, encoding="ascii")
2030 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002031 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00002032 b = self.BytesIO()
2033 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2034 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002035 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00002036 b = self.BytesIO()
2037 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00002038 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00002039 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002040 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002041 self.assertEqual(b.getvalue(), b"abcdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002042 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00002043 b = self.BytesIO()
2044 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00002045 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00002046 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002047 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002048 self.assertEqual(b.getvalue(), b"abc?def\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002049
Antoine Pitrou19690592009-06-12 20:14:08 +00002050 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002051 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2052
2053 tests = [
2054 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
2055 [ '', input_lines ],
2056 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2057 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2058 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
2059 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002060 encodings = (
2061 'utf-8', 'latin-1',
2062 'utf-16', 'utf-16-le', 'utf-16-be',
2063 'utf-32', 'utf-32-le', 'utf-32-be',
2064 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00002065
2066 # Try a range of buffer sizes to test the case where \r is the last
2067 # character in TextIOWrapper._pending_line.
2068 for encoding in encodings:
2069 # XXX: str.encode() should return bytes
2070 data = bytes(''.join(input_lines).encode(encoding))
2071 for do_reads in (False, True):
2072 for bufsize in range(1, 10):
2073 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002074 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2075 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00002076 encoding=encoding)
2077 if do_reads:
2078 got_lines = []
2079 while True:
2080 c2 = textio.read(2)
2081 if c2 == '':
2082 break
Ezio Melotti2623a372010-11-21 13:34:58 +00002083 self.assertEqual(len(c2), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002084 got_lines.append(c2 + textio.readline())
2085 else:
2086 got_lines = list(textio)
2087
2088 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti2623a372010-11-21 13:34:58 +00002089 self.assertEqual(got_line, exp_line)
2090 self.assertEqual(len(got_lines), len(exp_lines))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002091
Antoine Pitrou19690592009-06-12 20:14:08 +00002092 def test_newlines_input(self):
2093 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002094 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2095 for newline, expected in [
2096 (None, normalized.decode("ascii").splitlines(True)),
2097 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00002098 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2099 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2100 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00002101 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00002102 buf = self.BytesIO(testdata)
2103 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti2623a372010-11-21 13:34:58 +00002104 self.assertEqual(txt.readlines(), expected)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002105 txt.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002106 self.assertEqual(txt.read(), "".join(expected))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002107
Antoine Pitrou19690592009-06-12 20:14:08 +00002108 def test_newlines_output(self):
2109 testdict = {
2110 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2111 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2112 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2113 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2114 }
2115 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2116 for newline, expected in tests:
2117 buf = self.BytesIO()
2118 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2119 txt.write("AAA\nB")
2120 txt.write("BB\nCCC\n")
2121 txt.write("X\rY\r\nZ")
2122 txt.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002123 self.assertEqual(buf.closed, False)
2124 self.assertEqual(buf.getvalue(), expected)
Antoine Pitrou19690592009-06-12 20:14:08 +00002125
2126 def test_destructor(self):
2127 l = []
2128 base = self.BytesIO
2129 class MyBytesIO(base):
2130 def close(self):
2131 l.append(self.getvalue())
2132 base.close(self)
2133 b = MyBytesIO()
2134 t = self.TextIOWrapper(b, encoding="ascii")
2135 t.write("abc")
2136 del t
2137 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00002138 self.assertEqual([b"abc"], l)
Antoine Pitrou19690592009-06-12 20:14:08 +00002139
2140 def test_override_destructor(self):
2141 record = []
2142 class MyTextIO(self.TextIOWrapper):
2143 def __del__(self):
2144 record.append(1)
2145 try:
2146 f = super(MyTextIO, self).__del__
2147 except AttributeError:
2148 pass
2149 else:
2150 f()
2151 def close(self):
2152 record.append(2)
2153 super(MyTextIO, self).close()
2154 def flush(self):
2155 record.append(3)
2156 super(MyTextIO, self).flush()
2157 b = self.BytesIO()
2158 t = MyTextIO(b, encoding="ascii")
2159 del t
2160 support.gc_collect()
2161 self.assertEqual(record, [1, 2, 3])
2162
2163 def test_error_through_destructor(self):
2164 # Test that the exception state is not modified by a destructor,
2165 # even if close() fails.
2166 rawio = self.CloseFailureIO()
2167 def f():
2168 self.TextIOWrapper(rawio).xyzzy
2169 with support.captured_output("stderr") as s:
2170 self.assertRaises(AttributeError, f)
2171 s = s.getvalue().strip()
2172 if s:
2173 # The destructor *may* have printed an unraisable error, check it
2174 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002175 self.assertTrue(s.startswith("Exception IOError: "), s)
2176 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002177
2178 # Systematic tests of the text I/O API
2179
Antoine Pitrou19690592009-06-12 20:14:08 +00002180 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002181 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2182 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00002183 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002184 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002185 self.assertEqual(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002186 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002187 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002188 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002189 self.assertEqual(f.tell(), 0)
2190 self.assertEqual(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002191 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002192 self.assertEqual(f.seek(0), 0)
2193 self.assertEqual(f.read(None), "abc")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002194 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002195 self.assertEqual(f.read(2), "ab")
2196 self.assertEqual(f.read(1), "c")
2197 self.assertEqual(f.read(1), "")
2198 self.assertEqual(f.read(), "")
2199 self.assertEqual(f.tell(), cookie)
2200 self.assertEqual(f.seek(0), 0)
2201 self.assertEqual(f.seek(0, 2), cookie)
2202 self.assertEqual(f.write("def"), 3)
2203 self.assertEqual(f.seek(cookie), cookie)
2204 self.assertEqual(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002205 if enc.startswith("utf"):
2206 self.multi_line_test(f, enc)
2207 f.close()
2208
2209 def multi_line_test(self, f, enc):
2210 f.seek(0)
2211 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00002212 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002213 wlines = []
2214 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2215 chars = []
2216 for i in range(size):
2217 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00002218 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002219 wlines.append((f.tell(), line))
2220 f.write(line)
2221 f.seek(0)
2222 rlines = []
2223 while True:
2224 pos = f.tell()
2225 line = f.readline()
2226 if not line:
2227 break
2228 rlines.append((pos, line))
Ezio Melotti2623a372010-11-21 13:34:58 +00002229 self.assertEqual(rlines, wlines)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002230
Antoine Pitrou19690592009-06-12 20:14:08 +00002231 def test_telling(self):
2232 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002233 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002234 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002235 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002236 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002237 p2 = f.tell()
2238 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002239 self.assertEqual(f.tell(), p0)
2240 self.assertEqual(f.readline(), "\xff\n")
2241 self.assertEqual(f.tell(), p1)
2242 self.assertEqual(f.readline(), "\xff\n")
2243 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002244 f.seek(0)
2245 for line in f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002246 self.assertEqual(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002247 self.assertRaises(IOError, f.tell)
Ezio Melotti2623a372010-11-21 13:34:58 +00002248 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002249 f.close()
2250
Antoine Pitrou19690592009-06-12 20:14:08 +00002251 def test_seeking(self):
2252 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00002253 prefix_size = chunk_size - 2
2254 u_prefix = "a" * prefix_size
2255 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti2623a372010-11-21 13:34:58 +00002256 self.assertEqual(len(u_prefix), len(prefix))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002257 u_suffix = "\u8888\n"
2258 suffix = bytes(u_suffix.encode("utf-8"))
2259 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00002260 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002261 f.write(line*2)
2262 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002263 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002264 s = f.read(prefix_size)
Ezio Melotti2623a372010-11-21 13:34:58 +00002265 self.assertEqual(s, prefix.decode("ascii"))
2266 self.assertEqual(f.tell(), prefix_size)
2267 self.assertEqual(f.readline(), u_suffix)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002268
Antoine Pitrou19690592009-06-12 20:14:08 +00002269 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002270 # Regression test for a specific bug
2271 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00002272 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002273 f.write(data)
2274 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002275 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002276 f._CHUNK_SIZE # Just test that it exists
2277 f._CHUNK_SIZE = 2
2278 f.readline()
2279 f.tell()
2280
Antoine Pitrou19690592009-06-12 20:14:08 +00002281 def test_seek_and_tell(self):
2282 #Test seek/tell using the StatefulIncrementalDecoder.
2283 # Make test faster by doing smaller seeks
2284 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00002285
Antoine Pitrou19690592009-06-12 20:14:08 +00002286 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002287 """Tell/seek to various points within a data stream and ensure
2288 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00002289 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002290 f.write(data)
2291 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002292 f = self.open(support.TESTFN, encoding='test_decoder')
2293 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00002294 decoded = f.read()
2295 f.close()
2296
2297 for i in range(min_pos, len(decoded) + 1): # seek positions
2298 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00002299 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti2623a372010-11-21 13:34:58 +00002300 self.assertEqual(f.read(i), decoded[:i])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002301 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002302 self.assertEqual(f.read(j), decoded[i:i + j])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002303 f.seek(cookie)
Ezio Melotti2623a372010-11-21 13:34:58 +00002304 self.assertEqual(f.read(), decoded[i:])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002305 f.close()
2306
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002307 # Enable the test decoder.
2308 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002309
2310 # Run the tests.
2311 try:
2312 # Try each test case.
2313 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002314 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002315
2316 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002317 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2318 offset = CHUNK_SIZE - len(input)//2
2319 prefix = b'.'*offset
2320 # Don't bother seeking into the prefix (takes too long).
2321 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002322 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002323
2324 # Ensure our test decoder won't interfere with subsequent tests.
2325 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002326 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002327
Antoine Pitrou19690592009-06-12 20:14:08 +00002328 def test_encoded_writes(self):
2329 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002330 tests = ("utf-16",
2331 "utf-16-le",
2332 "utf-16-be",
2333 "utf-32",
2334 "utf-32-le",
2335 "utf-32-be")
2336 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002337 buf = self.BytesIO()
2338 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002339 # Check if the BOM is written only once (see issue1753).
2340 f.write(data)
2341 f.write(data)
2342 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002343 self.assertEqual(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002344 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002345 self.assertEqual(f.read(), data * 2)
2346 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002347
Antoine Pitrou19690592009-06-12 20:14:08 +00002348 def test_unreadable(self):
2349 class UnReadable(self.BytesIO):
2350 def readable(self):
2351 return False
2352 txt = self.TextIOWrapper(UnReadable())
2353 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002354
Antoine Pitrou19690592009-06-12 20:14:08 +00002355 def test_read_one_by_one(self):
2356 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002357 reads = ""
2358 while True:
2359 c = txt.read(1)
2360 if not c:
2361 break
2362 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002363 self.assertEqual(reads, "AA\nBB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002364
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002365 def test_readlines(self):
2366 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2367 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2368 txt.seek(0)
2369 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2370 txt.seek(0)
2371 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2372
Christian Heimes1a6387e2008-03-26 12:49:49 +00002373 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002374 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002375 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002376 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002377 reads = ""
2378 while True:
2379 c = txt.read(128)
2380 if not c:
2381 break
2382 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002383 self.assertEqual(reads, "A"*127+"\nB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002384
Antoine Pitroueadca1d2012-10-16 23:02:27 +02002385 def test_writelines(self):
2386 l = ['ab', 'cd', 'ef']
2387 buf = self.BytesIO()
2388 txt = self.TextIOWrapper(buf)
2389 txt.writelines(l)
2390 txt.flush()
2391 self.assertEqual(buf.getvalue(), b'abcdef')
2392
2393 def test_writelines_userlist(self):
2394 l = UserList(['ab', 'cd', 'ef'])
2395 buf = self.BytesIO()
2396 txt = self.TextIOWrapper(buf)
2397 txt.writelines(l)
2398 txt.flush()
2399 self.assertEqual(buf.getvalue(), b'abcdef')
2400
2401 def test_writelines_error(self):
2402 txt = self.TextIOWrapper(self.BytesIO())
2403 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2404 self.assertRaises(TypeError, txt.writelines, None)
2405 self.assertRaises(TypeError, txt.writelines, b'abc')
2406
Christian Heimes1a6387e2008-03-26 12:49:49 +00002407 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002408 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002409
2410 # read one char at a time
2411 reads = ""
2412 while True:
2413 c = txt.read(1)
2414 if not c:
2415 break
2416 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002417 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002418
2419 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002420 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002421 txt._CHUNK_SIZE = 4
2422
2423 reads = ""
2424 while True:
2425 c = txt.read(4)
2426 if not c:
2427 break
2428 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002429 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002430
2431 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002432 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002433 txt._CHUNK_SIZE = 4
2434
2435 reads = txt.read(4)
2436 reads += txt.read(4)
2437 reads += txt.readline()
2438 reads += txt.readline()
2439 reads += txt.readline()
Ezio Melotti2623a372010-11-21 13:34:58 +00002440 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002441
2442 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002443 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002444 txt._CHUNK_SIZE = 4
2445
2446 reads = txt.read(4)
2447 reads += txt.read()
Ezio Melotti2623a372010-11-21 13:34:58 +00002448 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002449
2450 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002451 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002452 txt._CHUNK_SIZE = 4
2453
2454 reads = txt.read(4)
2455 pos = txt.tell()
2456 txt.seek(0)
2457 txt.seek(pos)
Ezio Melotti2623a372010-11-21 13:34:58 +00002458 self.assertEqual(txt.read(4), "BBB\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002459
2460 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002461 buffer = self.BytesIO(self.testdata)
2462 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002463
2464 self.assertEqual(buffer.seekable(), txt.seekable())
2465
Antoine Pitrou19690592009-06-12 20:14:08 +00002466 def test_append_bom(self):
2467 # The BOM is not written again when appending to a non-empty file
2468 filename = support.TESTFN
2469 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2470 with self.open(filename, 'w', encoding=charset) as f:
2471 f.write('aaa')
2472 pos = f.tell()
2473 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002474 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002475
2476 with self.open(filename, 'a', encoding=charset) as f:
2477 f.write('xxx')
2478 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002479 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002480
Antoine Pitrou19690592009-06-12 20:14:08 +00002481 def test_seek_bom(self):
2482 # Same test, but when seeking manually
2483 filename = support.TESTFN
2484 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2485 with self.open(filename, 'w', encoding=charset) as f:
2486 f.write('aaa')
2487 pos = f.tell()
2488 with self.open(filename, 'r+', encoding=charset) as f:
2489 f.seek(pos)
2490 f.write('zzz')
2491 f.seek(0)
2492 f.write('bbb')
2493 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002494 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002495
2496 def test_errors_property(self):
2497 with self.open(support.TESTFN, "w") as f:
2498 self.assertEqual(f.errors, "strict")
2499 with self.open(support.TESTFN, "w", errors="replace") as f:
2500 self.assertEqual(f.errors, "replace")
2501
Victor Stinner6a102812010-04-27 23:55:59 +00002502 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002503 def test_threads_write(self):
2504 # Issue6750: concurrent writes could duplicate data
2505 event = threading.Event()
2506 with self.open(support.TESTFN, "w", buffering=1) as f:
2507 def run(n):
2508 text = "Thread%03d\n" % n
2509 event.wait()
2510 f.write(text)
2511 threads = [threading.Thread(target=lambda n=x: run(n))
2512 for x in range(20)]
2513 for t in threads:
2514 t.start()
2515 time.sleep(0.02)
2516 event.set()
2517 for t in threads:
2518 t.join()
2519 with self.open(support.TESTFN) as f:
2520 content = f.read()
2521 for n in range(20):
Ezio Melotti2623a372010-11-21 13:34:58 +00002522 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002523
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002524 def test_flush_error_on_close(self):
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002525 # Test that text file is closed despite failed flush
2526 # and that flush() is called before file closed.
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002527 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002528 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002529 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002530 closed[:] = [txt.closed, txt.buffer.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002531 raise IOError()
2532 txt.flush = bad_flush
2533 self.assertRaises(IOError, txt.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -06002534 self.assertTrue(txt.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002535 self.assertTrue(txt.buffer.closed)
2536 self.assertTrue(closed) # flush() called
2537 self.assertFalse(closed[0]) # flush() called before file closed
2538 self.assertFalse(closed[1])
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002539
2540 def test_multi_close(self):
2541 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2542 txt.close()
2543 txt.close()
2544 txt.close()
2545 self.assertRaises(ValueError, txt.flush)
2546
Antoine Pitroufc9ead62010-12-21 21:26:55 +00002547 def test_readonly_attributes(self):
2548 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2549 buf = self.BytesIO(self.testdata)
2550 with self.assertRaises((AttributeError, TypeError)):
2551 txt.buffer = buf
2552
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002553 def test_read_nonbytes(self):
2554 # Issue #17106
2555 # Crash when underlying read() returns non-bytes
2556 class NonbytesStream(self.StringIO):
2557 read1 = self.StringIO.read
2558 class NonbytesStream(self.StringIO):
2559 read1 = self.StringIO.read
2560 t = self.TextIOWrapper(NonbytesStream('a'))
2561 with self.maybeRaises(TypeError):
2562 t.read(1)
2563 t = self.TextIOWrapper(NonbytesStream('a'))
2564 with self.maybeRaises(TypeError):
2565 t.readline()
2566 t = self.TextIOWrapper(NonbytesStream('a'))
2567 self.assertEqual(t.read(), u'a')
2568
2569 def test_illegal_decoder(self):
2570 # Issue #17106
2571 # Crash when decoder returns non-string
2572 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2573 encoding='quopri_codec')
2574 with self.maybeRaises(TypeError):
2575 t.read(1)
2576 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2577 encoding='quopri_codec')
2578 with self.maybeRaises(TypeError):
2579 t.readline()
2580 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2581 encoding='quopri_codec')
2582 with self.maybeRaises(TypeError):
2583 t.read()
2584
2585
Antoine Pitrou19690592009-06-12 20:14:08 +00002586class CTextIOWrapperTest(TextIOWrapperTest):
2587
2588 def test_initialization(self):
2589 r = self.BytesIO(b"\xc3\xa9\n\n")
2590 b = self.BufferedReader(r, 1000)
2591 t = self.TextIOWrapper(b)
2592 self.assertRaises(TypeError, t.__init__, b, newline=42)
2593 self.assertRaises(ValueError, t.read)
2594 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2595 self.assertRaises(ValueError, t.read)
2596
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002597 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2598 self.assertRaises(Exception, repr, t)
2599
Antoine Pitrou19690592009-06-12 20:14:08 +00002600 def test_garbage_collection(self):
2601 # C TextIOWrapper objects are collected, and collecting them flushes
2602 # all data to disk.
2603 # The Python version has __del__, so it ends in gc.garbage instead.
2604 rawio = io.FileIO(support.TESTFN, "wb")
2605 b = self.BufferedWriter(rawio)
2606 t = self.TextIOWrapper(b, encoding="ascii")
2607 t.write("456def")
2608 t.x = t
2609 wr = weakref.ref(t)
2610 del t
2611 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002612 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002613 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002614 self.assertEqual(f.read(), b"456def")
2615
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002616 def test_rwpair_cleared_before_textio(self):
2617 # Issue 13070: TextIOWrapper's finalization would crash when called
2618 # after the reference to the underlying BufferedRWPair's writer got
2619 # cleared by the GC.
2620 for i in range(1000):
2621 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2622 t1 = self.TextIOWrapper(b1, encoding="ascii")
2623 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2624 t2 = self.TextIOWrapper(b2, encoding="ascii")
2625 # circular references
2626 t1.buddy = t2
2627 t2.buddy = t1
2628 support.gc_collect()
2629
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002630 maybeRaises = unittest.TestCase.assertRaises
2631
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002632
Antoine Pitrou19690592009-06-12 20:14:08 +00002633class PyTextIOWrapperTest(TextIOWrapperTest):
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002634 @contextlib.contextmanager
2635 def maybeRaises(self, *args, **kwds):
2636 yield
Antoine Pitrou19690592009-06-12 20:14:08 +00002637
2638
2639class IncrementalNewlineDecoderTest(unittest.TestCase):
2640
2641 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002642 # UTF-8 specific tests for a newline decoder
2643 def _check_decode(b, s, **kwargs):
2644 # We exercise getstate() / setstate() as well as decode()
2645 state = decoder.getstate()
Ezio Melotti2623a372010-11-21 13:34:58 +00002646 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002647 decoder.setstate(state)
Ezio Melotti2623a372010-11-21 13:34:58 +00002648 self.assertEqual(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002649
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002650 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002651
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002652 _check_decode(b'\xe8', "")
2653 _check_decode(b'\xa2', "")
2654 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002655
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002656 _check_decode(b'\xe8', "")
2657 _check_decode(b'\xa2', "")
2658 _check_decode(b'\x88', "\u8888")
2659
2660 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002661 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2662
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002663 decoder.reset()
2664 _check_decode(b'\n', "\n")
2665 _check_decode(b'\r', "")
2666 _check_decode(b'', "\n", final=True)
2667 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002668
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002669 _check_decode(b'\r', "")
2670 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002671
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002672 _check_decode(b'\r\r\n', "\n\n")
2673 _check_decode(b'\r', "")
2674 _check_decode(b'\r', "\n")
2675 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002676
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002677 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2678 _check_decode(b'\xe8\xa2\x88', "\u8888")
2679 _check_decode(b'\n', "\n")
2680 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2681 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002682
Antoine Pitrou19690592009-06-12 20:14:08 +00002683 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002684 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002685 if encoding is not None:
2686 encoder = codecs.getincrementalencoder(encoding)()
2687 def _decode_bytewise(s):
2688 # Decode one byte at a time
2689 for b in encoder.encode(s):
2690 result.append(decoder.decode(b))
2691 else:
2692 encoder = None
2693 def _decode_bytewise(s):
2694 # Decode one char at a time
2695 for c in s:
2696 result.append(decoder.decode(c))
Ezio Melotti2623a372010-11-21 13:34:58 +00002697 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002698 _decode_bytewise("abc\n\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002699 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002700 _decode_bytewise("\nabc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002701 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002702 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002703 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002704 _decode_bytewise("abc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002705 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002706 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002707 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002708 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002709 input = "abc"
2710 if encoder is not None:
2711 encoder.reset()
2712 input = encoder.encode(input)
Ezio Melotti2623a372010-11-21 13:34:58 +00002713 self.assertEqual(decoder.decode(input), "abc")
2714 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002715
2716 def test_newline_decoder(self):
2717 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002718 # None meaning the IncrementalNewlineDecoder takes unicode input
2719 # rather than bytes input
2720 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002721 'utf-16', 'utf-16-le', 'utf-16-be',
2722 'utf-32', 'utf-32-le', 'utf-32-be',
2723 )
2724 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002725 decoder = enc and codecs.getincrementaldecoder(enc)()
2726 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2727 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002728 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002729 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2730 self.check_newline_decoding_utf8(decoder)
2731
2732 def test_newline_bytes(self):
2733 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2734 def _check(dec):
Ezio Melotti2623a372010-11-21 13:34:58 +00002735 self.assertEqual(dec.newlines, None)
2736 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2737 self.assertEqual(dec.newlines, None)
2738 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2739 self.assertEqual(dec.newlines, None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002740 dec = self.IncrementalNewlineDecoder(None, translate=False)
2741 _check(dec)
2742 dec = self.IncrementalNewlineDecoder(None, translate=True)
2743 _check(dec)
2744
2745class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2746 pass
2747
2748class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2749 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002750
Christian Heimes1a6387e2008-03-26 12:49:49 +00002751
2752# XXX Tests for open()
2753
2754class MiscIOTest(unittest.TestCase):
2755
Benjamin Petersonad100c32008-11-20 22:06:22 +00002756 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002757 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002758
Antoine Pitrou19690592009-06-12 20:14:08 +00002759 def test___all__(self):
2760 for name in self.io.__all__:
2761 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002762 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002763 if name == "open":
2764 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002765 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002766 self.assertTrue(issubclass(obj, Exception), name)
2767 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002768 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002769
Benjamin Petersonad100c32008-11-20 22:06:22 +00002770 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002771 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002772 self.assertEqual(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002773 f.close()
2774
Antoine Pitrou19690592009-06-12 20:14:08 +00002775 f = self.open(support.TESTFN, "U")
Ezio Melotti2623a372010-11-21 13:34:58 +00002776 self.assertEqual(f.name, support.TESTFN)
2777 self.assertEqual(f.buffer.name, support.TESTFN)
2778 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2779 self.assertEqual(f.mode, "U")
2780 self.assertEqual(f.buffer.mode, "rb")
2781 self.assertEqual(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002782 f.close()
2783
Antoine Pitrou19690592009-06-12 20:14:08 +00002784 f = self.open(support.TESTFN, "w+")
Ezio Melotti2623a372010-11-21 13:34:58 +00002785 self.assertEqual(f.mode, "w+")
2786 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2787 self.assertEqual(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002788
Antoine Pitrou19690592009-06-12 20:14:08 +00002789 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti2623a372010-11-21 13:34:58 +00002790 self.assertEqual(g.mode, "wb")
2791 self.assertEqual(g.raw.mode, "wb")
2792 self.assertEqual(g.name, f.fileno())
2793 self.assertEqual(g.raw.name, f.fileno())
Benjamin Petersonad100c32008-11-20 22:06:22 +00002794 f.close()
2795 g.close()
2796
Antoine Pitrou19690592009-06-12 20:14:08 +00002797 def test_io_after_close(self):
2798 for kwargs in [
2799 {"mode": "w"},
2800 {"mode": "wb"},
2801 {"mode": "w", "buffering": 1},
2802 {"mode": "w", "buffering": 2},
2803 {"mode": "wb", "buffering": 0},
2804 {"mode": "r"},
2805 {"mode": "rb"},
2806 {"mode": "r", "buffering": 1},
2807 {"mode": "r", "buffering": 2},
2808 {"mode": "rb", "buffering": 0},
2809 {"mode": "w+"},
2810 {"mode": "w+b"},
2811 {"mode": "w+", "buffering": 1},
2812 {"mode": "w+", "buffering": 2},
2813 {"mode": "w+b", "buffering": 0},
2814 ]:
2815 f = self.open(support.TESTFN, **kwargs)
2816 f.close()
2817 self.assertRaises(ValueError, f.flush)
2818 self.assertRaises(ValueError, f.fileno)
2819 self.assertRaises(ValueError, f.isatty)
2820 self.assertRaises(ValueError, f.__iter__)
2821 if hasattr(f, "peek"):
2822 self.assertRaises(ValueError, f.peek, 1)
2823 self.assertRaises(ValueError, f.read)
2824 if hasattr(f, "read1"):
2825 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinner5100a402011-05-25 22:15:36 +02002826 if hasattr(f, "readall"):
2827 self.assertRaises(ValueError, f.readall)
Antoine Pitrou19690592009-06-12 20:14:08 +00002828 if hasattr(f, "readinto"):
2829 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2830 self.assertRaises(ValueError, f.readline)
2831 self.assertRaises(ValueError, f.readlines)
2832 self.assertRaises(ValueError, f.seek, 0)
2833 self.assertRaises(ValueError, f.tell)
2834 self.assertRaises(ValueError, f.truncate)
2835 self.assertRaises(ValueError, f.write,
2836 b"" if "b" in kwargs['mode'] else "")
2837 self.assertRaises(ValueError, f.writelines, [])
2838 self.assertRaises(ValueError, next, f)
2839
2840 def test_blockingioerror(self):
2841 # Various BlockingIOError issues
2842 self.assertRaises(TypeError, self.BlockingIOError)
2843 self.assertRaises(TypeError, self.BlockingIOError, 1)
2844 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2845 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2846 b = self.BlockingIOError(1, "")
2847 self.assertEqual(b.characters_written, 0)
2848 class C(unicode):
2849 pass
2850 c = C("")
2851 b = self.BlockingIOError(1, c)
2852 c.b = b
2853 b.c = c
2854 wr = weakref.ref(c)
2855 del c, b
2856 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002857 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002858
2859 def test_abcs(self):
2860 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002861 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2862 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2863 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2864 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002865
2866 def _check_abc_inheritance(self, abcmodule):
2867 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002868 self.assertIsInstance(f, abcmodule.IOBase)
2869 self.assertIsInstance(f, abcmodule.RawIOBase)
2870 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2871 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002872 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002873 self.assertIsInstance(f, abcmodule.IOBase)
2874 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2875 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2876 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002877 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002878 self.assertIsInstance(f, abcmodule.IOBase)
2879 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2880 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2881 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002882
2883 def test_abc_inheritance(self):
2884 # Test implementations inherit from their respective ABCs
2885 self._check_abc_inheritance(self)
2886
2887 def test_abc_inheritance_official(self):
2888 # Test implementations inherit from the official ABCs of the
2889 # baseline "io" module.
2890 self._check_abc_inheritance(io)
2891
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01002892 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2893 def test_nonblock_pipe_write_bigbuf(self):
2894 self._test_nonblock_pipe_write(16*1024)
2895
2896 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2897 def test_nonblock_pipe_write_smallbuf(self):
2898 self._test_nonblock_pipe_write(1024)
2899
2900 def _set_non_blocking(self, fd):
2901 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2902 self.assertNotEqual(flags, -1)
2903 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2904 self.assertEqual(res, 0)
2905
2906 def _test_nonblock_pipe_write(self, bufsize):
2907 sent = []
2908 received = []
2909 r, w = os.pipe()
2910 self._set_non_blocking(r)
2911 self._set_non_blocking(w)
2912
2913 # To exercise all code paths in the C implementation we need
2914 # to play with buffer sizes. For instance, if we choose a
2915 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2916 # then we will never get a partial write of the buffer.
2917 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2918 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2919
2920 with rf, wf:
2921 for N in 9999, 73, 7574:
2922 try:
2923 i = 0
2924 while True:
2925 msg = bytes([i % 26 + 97]) * N
2926 sent.append(msg)
2927 wf.write(msg)
2928 i += 1
2929
2930 except self.BlockingIOError as e:
2931 self.assertEqual(e.args[0], errno.EAGAIN)
2932 sent[-1] = sent[-1][:e.characters_written]
2933 received.append(rf.read())
2934 msg = b'BLOCKED'
2935 wf.write(msg)
2936 sent.append(msg)
2937
2938 while True:
2939 try:
2940 wf.flush()
2941 break
2942 except self.BlockingIOError as e:
2943 self.assertEqual(e.args[0], errno.EAGAIN)
2944 self.assertEqual(e.characters_written, 0)
2945 received.append(rf.read())
2946
2947 received += iter(rf.read, None)
2948
2949 sent, received = b''.join(sent), b''.join(received)
2950 self.assertTrue(sent == received)
2951 self.assertTrue(wf.closed)
2952 self.assertTrue(rf.closed)
2953
Antoine Pitrou19690592009-06-12 20:14:08 +00002954class CMiscIOTest(MiscIOTest):
2955 io = io
2956
2957class PyMiscIOTest(MiscIOTest):
2958 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002959
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002960
2961@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2962class SignalsTest(unittest.TestCase):
2963
2964 def setUp(self):
2965 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2966
2967 def tearDown(self):
2968 signal.signal(signal.SIGALRM, self.oldalrm)
2969
2970 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00002971 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002972
2973 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinner49d495f2011-07-04 11:44:46 +02002974 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
2975 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002976 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2977 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou6439c002011-02-25 21:35:47 +00002978 invokes the signal handler, and bubbles up the exception raised
2979 in the latter."""
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002980 read_results = []
2981 def _read():
2982 s = os.read(r, 1)
2983 read_results.append(s)
2984 t = threading.Thread(target=_read)
2985 t.daemon = True
2986 r, w = os.pipe()
2987 try:
2988 wio = self.io.open(w, **fdopen_kwargs)
2989 t.start()
2990 signal.alarm(1)
2991 # Fill the pipe enough that the write will be blocking.
2992 # It will be interrupted by the timer armed above. Since the
2993 # other thread has read one byte, the low-level write will
2994 # return with a successful (partial) result rather than an EINTR.
2995 # The buffered IO layer must check for pending signal
2996 # handlers, which in this case will invoke alarm_interrupt().
2997 self.assertRaises(ZeroDivisionError,
Antoine Pitrou68915d72013-04-24 23:31:38 +02002998 wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002999 t.join()
3000 # We got one byte, get another one and check that it isn't a
3001 # repeat of the first one.
3002 read_results.append(os.read(r, 1))
3003 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3004 finally:
3005 os.close(w)
3006 os.close(r)
3007 # This is deliberate. If we didn't close the file descriptor
3008 # before closing wio, wio would try to flush its internal
3009 # buffer, and block again.
3010 try:
3011 wio.close()
3012 except IOError as e:
3013 if e.errno != errno.EBADF:
3014 raise
3015
3016 def test_interrupted_write_unbuffered(self):
3017 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3018
3019 def test_interrupted_write_buffered(self):
3020 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3021
3022 def test_interrupted_write_text(self):
3023 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3024
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003025 def check_reentrant_write(self, data, **fdopen_kwargs):
3026 def on_alarm(*args):
3027 # Will be called reentrantly from the same thread
3028 wio.write(data)
Victor Stinner4c41f842011-07-05 13:29:26 +02003029 1//0
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003030 signal.signal(signal.SIGALRM, on_alarm)
3031 r, w = os.pipe()
3032 wio = self.io.open(w, **fdopen_kwargs)
3033 try:
3034 signal.alarm(1)
3035 # Either the reentrant call to wio.write() fails with RuntimeError,
3036 # or the signal handler raises ZeroDivisionError.
3037 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3038 while 1:
3039 for i in range(100):
3040 wio.write(data)
3041 wio.flush()
3042 # Make sure the buffer doesn't fill up and block further writes
3043 os.read(r, len(data) * 100)
3044 exc = cm.exception
3045 if isinstance(exc, RuntimeError):
3046 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3047 finally:
3048 wio.close()
3049 os.close(r)
3050
3051 def test_reentrant_write_buffered(self):
3052 self.check_reentrant_write(b"xy", mode="wb")
3053
3054 def test_reentrant_write_text(self):
3055 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3056
Antoine Pitrou6439c002011-02-25 21:35:47 +00003057 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3058 """Check that a buffered read, when it gets interrupted (either
3059 returning a partial result or EINTR), properly invokes the signal
3060 handler and retries if the latter returned successfully."""
3061 r, w = os.pipe()
3062 fdopen_kwargs["closefd"] = False
3063 def alarm_handler(sig, frame):
3064 os.write(w, b"bar")
3065 signal.signal(signal.SIGALRM, alarm_handler)
3066 try:
3067 rio = self.io.open(r, **fdopen_kwargs)
3068 os.write(w, b"foo")
3069 signal.alarm(1)
3070 # Expected behaviour:
3071 # - first raw read() returns partial b"foo"
3072 # - second raw read() returns EINTR
3073 # - third raw read() returns b"bar"
3074 self.assertEqual(decode(rio.read(6)), "foobar")
3075 finally:
3076 rio.close()
3077 os.close(w)
3078 os.close(r)
3079
3080 def test_interrupterd_read_retry_buffered(self):
3081 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3082 mode="rb")
3083
3084 def test_interrupterd_read_retry_text(self):
3085 self.check_interrupted_read_retry(lambda x: x,
3086 mode="r")
3087
3088 @unittest.skipUnless(threading, 'Threading required for this test.')
3089 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3090 """Check that a buffered write, when it gets interrupted (either
3091 returning a partial result or EINTR), properly invokes the signal
3092 handler and retries if the latter returned successfully."""
3093 select = support.import_module("select")
3094 # A quantity that exceeds the buffer size of an anonymous pipe's
3095 # write end.
Antoine Pitrou68915d72013-04-24 23:31:38 +02003096 N = support.PIPE_MAX_SIZE
Antoine Pitrou6439c002011-02-25 21:35:47 +00003097 r, w = os.pipe()
3098 fdopen_kwargs["closefd"] = False
3099 # We need a separate thread to read from the pipe and allow the
3100 # write() to finish. This thread is started after the SIGALRM is
3101 # received (forcing a first EINTR in write()).
3102 read_results = []
3103 write_finished = False
3104 def _read():
3105 while not write_finished:
3106 while r in select.select([r], [], [], 1.0)[0]:
3107 s = os.read(r, 1024)
3108 read_results.append(s)
3109 t = threading.Thread(target=_read)
3110 t.daemon = True
3111 def alarm1(sig, frame):
3112 signal.signal(signal.SIGALRM, alarm2)
3113 signal.alarm(1)
3114 def alarm2(sig, frame):
3115 t.start()
3116 signal.signal(signal.SIGALRM, alarm1)
3117 try:
3118 wio = self.io.open(w, **fdopen_kwargs)
3119 signal.alarm(1)
3120 # Expected behaviour:
3121 # - first raw write() is partial (because of the limited pipe buffer
3122 # and the first alarm)
3123 # - second raw write() returns EINTR (because of the second alarm)
3124 # - subsequent write()s are successful (either partial or complete)
3125 self.assertEqual(N, wio.write(item * N))
3126 wio.flush()
3127 write_finished = True
3128 t.join()
3129 self.assertEqual(N, sum(len(x) for x in read_results))
3130 finally:
3131 write_finished = True
3132 os.close(w)
3133 os.close(r)
3134 # This is deliberate. If we didn't close the file descriptor
3135 # before closing wio, wio would try to flush its internal
3136 # buffer, and could block (in case of failure).
3137 try:
3138 wio.close()
3139 except IOError as e:
3140 if e.errno != errno.EBADF:
3141 raise
3142
3143 def test_interrupterd_write_retry_buffered(self):
3144 self.check_interrupted_write_retry(b"x", mode="wb")
3145
3146 def test_interrupterd_write_retry_text(self):
3147 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3148
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003149
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003150class CSignalsTest(SignalsTest):
3151 io = io
3152
3153class PySignalsTest(SignalsTest):
3154 io = pyio
3155
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003156 # Handling reentrancy issues would slow down _pyio even more, so the
3157 # tests are disabled.
3158 test_reentrant_write_buffered = None
3159 test_reentrant_write_text = None
3160
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003161
Christian Heimes1a6387e2008-03-26 12:49:49 +00003162def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00003163 tests = (CIOTest, PyIOTest,
3164 CBufferedReaderTest, PyBufferedReaderTest,
3165 CBufferedWriterTest, PyBufferedWriterTest,
3166 CBufferedRWPairTest, PyBufferedRWPairTest,
3167 CBufferedRandomTest, PyBufferedRandomTest,
3168 StatefulIncrementalDecoderTest,
3169 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3170 CTextIOWrapperTest, PyTextIOWrapperTest,
3171 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003172 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00003173 )
3174
3175 # Put the namespaces of the IO module we are testing and some useful mock
3176 # classes in the __dict__ of each test.
3177 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00003178 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00003179 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3180 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
3181 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
3182 globs = globals()
3183 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3184 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3185 # Avoid turning open into a bound method.
3186 py_io_ns["open"] = pyio.OpenWrapper
3187 for test in tests:
3188 if test.__name__.startswith("C"):
3189 for name, obj in c_io_ns.items():
3190 setattr(test, name, obj)
3191 elif test.__name__.startswith("Py"):
3192 for name, obj in py_io_ns.items():
3193 setattr(test, name, obj)
3194
3195 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00003196
3197if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00003198 test_main()