blob: bbc804b6a58a3aa76a96d52c6b1b305ae359c92d [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Serhiy Storchaka05b0a1b2014-06-09 13:32:08 +030032import warnings
Antoine Pitrou19690592009-06-12 20:14:08 +000033import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000034import signal
35import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000036from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000037from collections import deque
Antoine Pitrou78e761e2012-10-16 22:57:11 +020038from UserList import UserList
Antoine Pitrou19690592009-06-12 20:14:08 +000039from test import test_support as support
Serhiy Storchaka354d50e2013-02-03 17:10:42 +020040import contextlib
Christian Heimes1a6387e2008-03-26 12:49:49 +000041
42import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000043import io # C implementation of io
44import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000045try:
46 import threading
47except ImportError:
48 threading = None
Antoine Pitrou5aa7df32011-11-21 20:16:44 +010049try:
50 import fcntl
51except ImportError:
52 fcntl = None
Antoine Pitrou19690592009-06-12 20:14:08 +000053
54__metaclass__ = type
55bytes = support.py3k_bytes
56
57def _default_chunk_size():
58 """Get the default TextIOWrapper chunk size"""
59 with io.open(__file__, "r", encoding="latin1") as f:
60 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000061
62
Antoine Pitrou6391b342010-09-14 18:48:19 +000063class MockRawIOWithoutRead:
64 """A RawIO implementation without read(), so as to exercise the default
65 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000066
67 def __init__(self, read_stack=()):
68 self._read_stack = list(read_stack)
69 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000070 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000071 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000072
Christian Heimes1a6387e2008-03-26 12:49:49 +000073 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000074 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000075 return len(b)
76
77 def writable(self):
78 return True
79
80 def fileno(self):
81 return 42
82
83 def readable(self):
84 return True
85
86 def seekable(self):
87 return True
88
89 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000090 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000091
92 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000093 return 0 # same comment as above
94
95 def readinto(self, buf):
96 self._reads += 1
97 max_len = len(buf)
98 try:
99 data = self._read_stack[0]
100 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000101 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +0000102 return 0
103 if data is None:
104 del self._read_stack[0]
105 return None
106 n = len(data)
107 if len(data) <= max_len:
108 del self._read_stack[0]
109 buf[:n] = data
110 return n
111 else:
112 buf[:] = data[:max_len]
113 self._read_stack[0] = data[max_len:]
114 return max_len
115
116 def truncate(self, pos=None):
117 return pos
118
Antoine Pitrou6391b342010-09-14 18:48:19 +0000119class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
120 pass
121
122class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
123 pass
124
125
126class MockRawIO(MockRawIOWithoutRead):
127
128 def read(self, n=None):
129 self._reads += 1
130 try:
131 return self._read_stack.pop(0)
132 except:
133 self._extraneous_reads += 1
134 return b""
135
Antoine Pitrou19690592009-06-12 20:14:08 +0000136class CMockRawIO(MockRawIO, io.RawIOBase):
137 pass
138
139class PyMockRawIO(MockRawIO, pyio.RawIOBase):
140 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000141
142
Antoine Pitrou19690592009-06-12 20:14:08 +0000143class MisbehavedRawIO(MockRawIO):
144 def write(self, b):
145 return MockRawIO.write(self, b) * 2
146
147 def read(self, n=None):
148 return MockRawIO.read(self, n) * 2
149
150 def seek(self, pos, whence):
151 return -123
152
153 def tell(self):
154 return -456
155
156 def readinto(self, buf):
157 MockRawIO.readinto(self, buf)
158 return len(buf) * 5
159
160class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
161 pass
162
163class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
164 pass
165
166
167class CloseFailureIO(MockRawIO):
168 closed = 0
169
170 def close(self):
171 if not self.closed:
172 self.closed = 1
173 raise IOError
174
175class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
176 pass
177
178class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
179 pass
180
181
182class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000183
184 def __init__(self, data):
185 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000186 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000187
188 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000189 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000190 self.read_history.append(None if res is None else len(res))
191 return res
192
Antoine Pitrou19690592009-06-12 20:14:08 +0000193 def readinto(self, b):
194 res = super(MockFileIO, self).readinto(b)
195 self.read_history.append(res)
196 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000197
Antoine Pitrou19690592009-06-12 20:14:08 +0000198class CMockFileIO(MockFileIO, io.BytesIO):
199 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000200
Antoine Pitrou19690592009-06-12 20:14:08 +0000201class PyMockFileIO(MockFileIO, pyio.BytesIO):
202 pass
203
204
205class MockNonBlockWriterIO:
206
207 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000208 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000209 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000210
Antoine Pitrou19690592009-06-12 20:14:08 +0000211 def pop_written(self):
212 s = b"".join(self._write_stack)
213 self._write_stack[:] = []
214 return s
215
216 def block_on(self, char):
217 """Block when a given char is encountered."""
218 self._blocker_char = char
219
220 def readable(self):
221 return True
222
223 def seekable(self):
224 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000225
226 def writable(self):
227 return True
228
Antoine Pitrou19690592009-06-12 20:14:08 +0000229 def write(self, b):
230 b = bytes(b)
231 n = -1
232 if self._blocker_char:
233 try:
234 n = b.index(self._blocker_char)
235 except ValueError:
236 pass
237 else:
Antoine Pitrou5aa7df32011-11-21 20:16:44 +0100238 if n > 0:
239 # write data up to the first blocker
240 self._write_stack.append(b[:n])
241 return n
242 else:
243 # cancel blocker and indicate would block
244 self._blocker_char = None
245 return None
Antoine Pitrou19690592009-06-12 20:14:08 +0000246 self._write_stack.append(b)
247 return len(b)
248
249class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
250 BlockingIOError = io.BlockingIOError
251
252class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
253 BlockingIOError = pyio.BlockingIOError
254
Christian Heimes1a6387e2008-03-26 12:49:49 +0000255
256class IOTest(unittest.TestCase):
257
Antoine Pitrou19690592009-06-12 20:14:08 +0000258 def setUp(self):
259 support.unlink(support.TESTFN)
260
Christian Heimes1a6387e2008-03-26 12:49:49 +0000261 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000262 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000263
264 def write_ops(self, f):
265 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000266 f.truncate(0)
267 self.assertEqual(f.tell(), 5)
268 f.seek(0)
269
270 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000271 self.assertEqual(f.seek(0), 0)
272 self.assertEqual(f.write(b"Hello."), 6)
273 self.assertEqual(f.tell(), 6)
274 self.assertEqual(f.seek(-1, 1), 5)
275 self.assertEqual(f.tell(), 5)
276 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
277 self.assertEqual(f.seek(0), 0)
278 self.assertEqual(f.write(b"h"), 1)
279 self.assertEqual(f.seek(-1, 2), 13)
280 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000281
Christian Heimes1a6387e2008-03-26 12:49:49 +0000282 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000283 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000284 self.assertRaises(TypeError, f.seek, 0.0)
285
286 def read_ops(self, f, buffered=False):
287 data = f.read(5)
288 self.assertEqual(data, b"hello")
289 data = bytearray(data)
290 self.assertEqual(f.readinto(data), 5)
291 self.assertEqual(data, b" worl")
292 self.assertEqual(f.readinto(data), 2)
293 self.assertEqual(len(data), 5)
294 self.assertEqual(data[:2], b"d\n")
295 self.assertEqual(f.seek(0), 0)
296 self.assertEqual(f.read(20), b"hello world\n")
297 self.assertEqual(f.read(1), b"")
298 self.assertEqual(f.readinto(bytearray(b"x")), 0)
299 self.assertEqual(f.seek(-6, 2), 6)
300 self.assertEqual(f.read(5), b"world")
301 self.assertEqual(f.read(0), b"")
302 self.assertEqual(f.readinto(bytearray()), 0)
303 self.assertEqual(f.seek(-6, 1), 5)
304 self.assertEqual(f.read(5), b" worl")
305 self.assertEqual(f.tell(), 10)
306 self.assertRaises(TypeError, f.seek, 0.0)
307 if buffered:
308 f.seek(0)
309 self.assertEqual(f.read(), b"hello world\n")
310 f.seek(6)
311 self.assertEqual(f.read(), b"world\n")
312 self.assertEqual(f.read(), b"")
313
314 LARGE = 2**31
315
316 def large_file_ops(self, f):
317 assert f.readable()
318 assert f.writable()
319 self.assertEqual(f.seek(self.LARGE), self.LARGE)
320 self.assertEqual(f.tell(), self.LARGE)
321 self.assertEqual(f.write(b"xxx"), 3)
322 self.assertEqual(f.tell(), self.LARGE + 3)
323 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
324 self.assertEqual(f.truncate(), self.LARGE + 2)
325 self.assertEqual(f.tell(), self.LARGE + 2)
326 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
327 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000328 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000329 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
330 self.assertEqual(f.seek(-1, 2), self.LARGE)
331 self.assertEqual(f.read(2), b"x")
332
Antoine Pitrou19690592009-06-12 20:14:08 +0000333 def test_invalid_operations(self):
334 # Try writing on a file opened in read mode and vice-versa.
335 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000336 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000337 self.assertRaises(IOError, fp.read)
338 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000339 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000340 self.assertRaises(IOError, fp.write, b"blah")
341 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000342 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000343 self.assertRaises(IOError, fp.write, "blah")
344 self.assertRaises(IOError, fp.writelines, ["blah\n"])
345
Christian Heimes1a6387e2008-03-26 12:49:49 +0000346 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000347 with self.open(support.TESTFN, "wb", buffering=0) as f:
348 self.assertEqual(f.readable(), False)
349 self.assertEqual(f.writable(), True)
350 self.assertEqual(f.seekable(), True)
351 self.write_ops(f)
352 with self.open(support.TESTFN, "rb", buffering=0) as f:
353 self.assertEqual(f.readable(), True)
354 self.assertEqual(f.writable(), False)
355 self.assertEqual(f.seekable(), True)
356 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000357
358 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000359 with self.open(support.TESTFN, "wb") as f:
360 self.assertEqual(f.readable(), False)
361 self.assertEqual(f.writable(), True)
362 self.assertEqual(f.seekable(), True)
363 self.write_ops(f)
364 with self.open(support.TESTFN, "rb") as f:
365 self.assertEqual(f.readable(), True)
366 self.assertEqual(f.writable(), False)
367 self.assertEqual(f.seekable(), True)
368 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000369
370 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000371 with self.open(support.TESTFN, "wb") as f:
372 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
373 with self.open(support.TESTFN, "rb") as f:
374 self.assertEqual(f.readline(), b"abc\n")
375 self.assertEqual(f.readline(10), b"def\n")
376 self.assertEqual(f.readline(2), b"xy")
377 self.assertEqual(f.readline(4), b"zzy\n")
378 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000379 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000380 self.assertRaises(TypeError, f.readline, 5.3)
381 with self.open(support.TESTFN, "r") as f:
382 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000383
384 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000385 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000386 self.write_ops(f)
387 data = f.getvalue()
388 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000389 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000390 self.read_ops(f, True)
391
392 def test_large_file_ops(self):
393 # On Windows and Mac OSX this test comsumes large resources; It takes
394 # a long time to build the >2GB file and takes >2GB of disk space
395 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000396 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware1f702212013-12-10 14:09:20 -0600397 support.requires(
398 'largefile',
399 'test requires %s bytes and a long time to run' % self.LARGE)
Antoine Pitrou19690592009-06-12 20:14:08 +0000400 with self.open(support.TESTFN, "w+b", 0) as f:
401 self.large_file_ops(f)
402 with self.open(support.TESTFN, "w+b") as f:
403 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000404
405 def test_with_open(self):
406 for bufsize in (0, 1, 100):
407 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000408 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000409 f.write(b"xxx")
410 self.assertEqual(f.closed, True)
411 f = None
412 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000413 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000414 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000415 except ZeroDivisionError:
416 self.assertEqual(f.closed, True)
417 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000418 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000419
Antoine Pitroue741cc62009-01-21 00:45:36 +0000420 # issue 5008
421 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000422 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000423 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000424 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000425 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000426 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000427 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000428 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000429 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000430
Christian Heimes1a6387e2008-03-26 12:49:49 +0000431 def test_destructor(self):
432 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000433 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000434 def __del__(self):
435 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000436 try:
437 f = super(MyFileIO, self).__del__
438 except AttributeError:
439 pass
440 else:
441 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000442 def close(self):
443 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000444 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000445 def flush(self):
446 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000447 super(MyFileIO, self).flush()
448 f = MyFileIO(support.TESTFN, "wb")
449 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000450 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000451 support.gc_collect()
452 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000453 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000454 self.assertEqual(f.read(), b"xxx")
455
456 def _check_base_destructor(self, base):
457 record = []
458 class MyIO(base):
459 def __init__(self):
460 # This exercises the availability of attributes on object
461 # destruction.
462 # (in the C version, close() is called by the tp_dealloc
463 # function, not by __del__)
464 self.on_del = 1
465 self.on_close = 2
466 self.on_flush = 3
467 def __del__(self):
468 record.append(self.on_del)
469 try:
470 f = super(MyIO, self).__del__
471 except AttributeError:
472 pass
473 else:
474 f()
475 def close(self):
476 record.append(self.on_close)
477 super(MyIO, self).close()
478 def flush(self):
479 record.append(self.on_flush)
480 super(MyIO, self).flush()
481 f = MyIO()
482 del f
483 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000484 self.assertEqual(record, [1, 2, 3])
485
Antoine Pitrou19690592009-06-12 20:14:08 +0000486 def test_IOBase_destructor(self):
487 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000488
Antoine Pitrou19690592009-06-12 20:14:08 +0000489 def test_RawIOBase_destructor(self):
490 self._check_base_destructor(self.RawIOBase)
491
492 def test_BufferedIOBase_destructor(self):
493 self._check_base_destructor(self.BufferedIOBase)
494
495 def test_TextIOBase_destructor(self):
496 self._check_base_destructor(self.TextIOBase)
497
498 def test_close_flushes(self):
499 with self.open(support.TESTFN, "wb") as f:
500 f.write(b"xxx")
501 with self.open(support.TESTFN, "rb") as f:
502 self.assertEqual(f.read(), b"xxx")
503
504 def test_array_writes(self):
505 a = array.array(b'i', range(10))
506 n = len(a.tostring())
507 with self.open(support.TESTFN, "wb", 0) as f:
508 self.assertEqual(f.write(a), n)
509 with self.open(support.TESTFN, "wb") as f:
510 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000511
512 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000513 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000514 closefd=False)
515
Antoine Pitrou19690592009-06-12 20:14:08 +0000516 def test_read_closed(self):
517 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000518 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000519 with self.open(support.TESTFN, "r") as f:
520 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000521 self.assertEqual(file.read(), "egg\n")
522 file.seek(0)
523 file.close()
524 self.assertRaises(ValueError, file.read)
525
526 def test_no_closefd_with_filename(self):
527 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000528 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000529
530 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000531 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000532 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000533 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000534 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000535 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000536 self.assertEqual(file.buffer.raw.closefd, False)
537
Antoine Pitrou19690592009-06-12 20:14:08 +0000538 def test_garbage_collection(self):
539 # FileIO objects are collected, and collecting them flushes
540 # all data to disk.
541 f = self.FileIO(support.TESTFN, "wb")
542 f.write(b"abcxxx")
543 f.f = f
544 wr = weakref.ref(f)
545 del f
546 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000547 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000548 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000549 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000550
Antoine Pitrou19690592009-06-12 20:14:08 +0000551 def test_unbounded_file(self):
552 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
553 zero = "/dev/zero"
554 if not os.path.exists(zero):
555 self.skipTest("{0} does not exist".format(zero))
556 if sys.maxsize > 0x7FFFFFFF:
557 self.skipTest("test can only run in a 32-bit address space")
558 if support.real_max_memuse < support._2G:
559 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000560 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000561 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000562 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000563 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000564 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000565 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000566
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200567 def check_flush_error_on_close(self, *args, **kwargs):
568 # Test that the file is closed despite failed flush
569 # and that flush() is called before file closed.
570 f = self.open(*args, **kwargs)
571 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000572 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200573 closed[:] = [f.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000574 raise IOError()
575 f.flush = bad_flush
576 self.assertRaises(IOError, f.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600577 self.assertTrue(f.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200578 self.assertTrue(closed) # flush() called
579 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchaka437d5352015-02-23 00:28:38 +0200580 f.flush = lambda: None # break reference loop
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200581
582 def test_flush_error_on_close(self):
583 # raw file
584 # Issue #5700: io.FileIO calls flush() after file closed
585 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
586 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
587 self.check_flush_error_on_close(fd, 'wb', buffering=0)
588 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
589 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
590 os.close(fd)
591 # buffered io
592 self.check_flush_error_on_close(support.TESTFN, 'wb')
593 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
594 self.check_flush_error_on_close(fd, 'wb')
595 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
596 self.check_flush_error_on_close(fd, 'wb', closefd=False)
597 os.close(fd)
598 # text io
599 self.check_flush_error_on_close(support.TESTFN, 'w')
600 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
601 self.check_flush_error_on_close(fd, 'w')
602 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
603 self.check_flush_error_on_close(fd, 'w', closefd=False)
604 os.close(fd)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000605
606 def test_multi_close(self):
607 f = self.open(support.TESTFN, "wb", buffering=0)
608 f.close()
609 f.close()
610 f.close()
611 self.assertRaises(ValueError, f.flush)
612
Antoine Pitrou6391b342010-09-14 18:48:19 +0000613 def test_RawIOBase_read(self):
614 # Exercise the default RawIOBase.read() implementation (which calls
615 # readinto() internally).
616 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
617 self.assertEqual(rawio.read(2), b"ab")
618 self.assertEqual(rawio.read(2), b"c")
619 self.assertEqual(rawio.read(2), b"d")
620 self.assertEqual(rawio.read(2), None)
621 self.assertEqual(rawio.read(2), b"ef")
622 self.assertEqual(rawio.read(2), b"g")
623 self.assertEqual(rawio.read(2), None)
624 self.assertEqual(rawio.read(2), b"")
625
Hynek Schlawack877effc2012-05-25 09:24:18 +0200626 def test_fileio_closefd(self):
627 # Issue #4841
628 with self.open(__file__, 'rb') as f1, \
629 self.open(__file__, 'rb') as f2:
630 fileio = self.FileIO(f1.fileno(), closefd=False)
631 # .__init__() must not close f1
632 fileio.__init__(f2.fileno(), closefd=False)
633 f1.readline()
634 # .close() must not close f2
635 fileio.close()
636 f2.readline()
637
Serhiy Storchaka05b0a1b2014-06-09 13:32:08 +0300638 def test_nonbuffered_textio(self):
639 with warnings.catch_warnings(record=True) as recorded:
640 with self.assertRaises(ValueError):
641 self.open(support.TESTFN, 'w', buffering=0)
642 support.gc_collect()
643 self.assertEqual(recorded, [])
644
645 def test_invalid_newline(self):
646 with warnings.catch_warnings(record=True) as recorded:
647 with self.assertRaises(ValueError):
648 self.open(support.TESTFN, 'w', newline='invalid')
649 support.gc_collect()
650 self.assertEqual(recorded, [])
651
Hynek Schlawack877effc2012-05-25 09:24:18 +0200652
Antoine Pitrou19690592009-06-12 20:14:08 +0000653class CIOTest(IOTest):
Antoine Pitrou16166452011-07-12 22:04:20 +0200654
655 def test_IOBase_finalize(self):
656 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
657 # class which inherits IOBase and an object of this class are caught
658 # in a reference cycle and close() is already in the method cache.
659 class MyIO(self.IOBase):
660 def close(self):
661 pass
662
663 # create an instance to populate the method cache
664 MyIO()
665 obj = MyIO()
666 obj.obj = obj
667 wr = weakref.ref(obj)
668 del MyIO
669 del obj
670 support.gc_collect()
671 self.assertTrue(wr() is None, wr)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000672
Antoine Pitrou19690592009-06-12 20:14:08 +0000673class PyIOTest(IOTest):
674 test_array_writes = unittest.skip(
675 "len(array.array) returns number of elements rather than bytelength"
676 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000677
678
Antoine Pitrou19690592009-06-12 20:14:08 +0000679class CommonBufferedTests:
680 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
681
682 def test_detach(self):
683 raw = self.MockRawIO()
684 buf = self.tp(raw)
685 self.assertIs(buf.detach(), raw)
686 self.assertRaises(ValueError, buf.detach)
687
Benjamin Peterson53ae6142014-12-21 20:51:50 -0600688 repr(buf) # Should still work
689
Antoine Pitrou19690592009-06-12 20:14:08 +0000690 def test_fileno(self):
691 rawio = self.MockRawIO()
692 bufio = self.tp(rawio)
693
Ezio Melotti2623a372010-11-21 13:34:58 +0000694 self.assertEqual(42, bufio.fileno())
Antoine Pitrou19690592009-06-12 20:14:08 +0000695
Zachary Ware1f702212013-12-10 14:09:20 -0600696 @unittest.skip('test having existential crisis')
Antoine Pitrou19690592009-06-12 20:14:08 +0000697 def test_no_fileno(self):
698 # XXX will we always have fileno() function? If so, kill
699 # this test. Else, write it.
700 pass
701
702 def test_invalid_args(self):
703 rawio = self.MockRawIO()
704 bufio = self.tp(rawio)
705 # Invalid whence
706 self.assertRaises(ValueError, bufio.seek, 0, -1)
707 self.assertRaises(ValueError, bufio.seek, 0, 3)
708
709 def test_override_destructor(self):
710 tp = self.tp
711 record = []
712 class MyBufferedIO(tp):
713 def __del__(self):
714 record.append(1)
715 try:
716 f = super(MyBufferedIO, self).__del__
717 except AttributeError:
718 pass
719 else:
720 f()
721 def close(self):
722 record.append(2)
723 super(MyBufferedIO, self).close()
724 def flush(self):
725 record.append(3)
726 super(MyBufferedIO, self).flush()
727 rawio = self.MockRawIO()
728 bufio = MyBufferedIO(rawio)
729 writable = bufio.writable()
730 del bufio
731 support.gc_collect()
732 if writable:
733 self.assertEqual(record, [1, 2, 3])
734 else:
735 self.assertEqual(record, [1, 2])
736
737 def test_context_manager(self):
738 # Test usability as a context manager
739 rawio = self.MockRawIO()
740 bufio = self.tp(rawio)
741 def _with():
742 with bufio:
743 pass
744 _with()
745 # bufio should now be closed, and using it a second time should raise
746 # a ValueError.
747 self.assertRaises(ValueError, _with)
748
749 def test_error_through_destructor(self):
750 # Test that the exception state is not modified by a destructor,
751 # even if close() fails.
752 rawio = self.CloseFailureIO()
753 def f():
754 self.tp(rawio).xyzzy
755 with support.captured_output("stderr") as s:
756 self.assertRaises(AttributeError, f)
757 s = s.getvalue().strip()
758 if s:
759 # The destructor *may* have printed an unraisable error, check it
760 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000761 self.assertTrue(s.startswith("Exception IOError: "), s)
762 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000763
764 def test_repr(self):
765 raw = self.MockRawIO()
766 b = self.tp(raw)
767 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
768 self.assertEqual(repr(b), "<%s>" % clsname)
769 raw.name = "dummy"
770 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
771 raw.name = b"dummy"
772 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000773
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000774 def test_flush_error_on_close(self):
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200775 # Test that buffered file is closed despite failed flush
776 # and that flush() is called before file closed.
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000777 raw = self.MockRawIO()
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200778 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000779 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200780 closed[:] = [b.closed, raw.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000781 raise IOError()
782 raw.flush = bad_flush
783 b = self.tp(raw)
784 self.assertRaises(IOError, b.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600785 self.assertTrue(b.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200786 self.assertTrue(raw.closed)
787 self.assertTrue(closed) # flush() called
788 self.assertFalse(closed[0]) # flush() called before file closed
789 self.assertFalse(closed[1])
Serhiy Storchaka437d5352015-02-23 00:28:38 +0200790 raw.flush = lambda: None # break reference loop
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600791
792 def test_close_error_on_close(self):
793 raw = self.MockRawIO()
794 def bad_flush():
795 raise IOError('flush')
796 def bad_close():
797 raise IOError('close')
798 raw.close = bad_close
799 b = self.tp(raw)
800 b.flush = bad_flush
801 with self.assertRaises(IOError) as err: # exception not swallowed
802 b.close()
803 self.assertEqual(err.exception.args, ('close',))
804 self.assertFalse(b.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000805
806 def test_multi_close(self):
807 raw = self.MockRawIO()
808 b = self.tp(raw)
809 b.close()
810 b.close()
811 b.close()
812 self.assertRaises(ValueError, b.flush)
813
Antoine Pitroufc9ead62010-12-21 21:26:55 +0000814 def test_readonly_attributes(self):
815 raw = self.MockRawIO()
816 buf = self.tp(raw)
817 x = self.MockRawIO()
818 with self.assertRaises((AttributeError, TypeError)):
819 buf.raw = x
820
Christian Heimes1a6387e2008-03-26 12:49:49 +0000821
Antoine Pitroubff5df02012-07-29 19:02:46 +0200822class SizeofTest:
823
824 @support.cpython_only
825 def test_sizeof(self):
826 bufsize1 = 4096
827 bufsize2 = 8192
828 rawio = self.MockRawIO()
829 bufio = self.tp(rawio, buffer_size=bufsize1)
830 size = sys.getsizeof(bufio) - bufsize1
831 rawio = self.MockRawIO()
832 bufio = self.tp(rawio, buffer_size=bufsize2)
833 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
834
835
Antoine Pitrou19690592009-06-12 20:14:08 +0000836class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
837 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000838
Antoine Pitrou19690592009-06-12 20:14:08 +0000839 def test_constructor(self):
840 rawio = self.MockRawIO([b"abc"])
841 bufio = self.tp(rawio)
842 bufio.__init__(rawio)
843 bufio.__init__(rawio, buffer_size=1024)
844 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000845 self.assertEqual(b"abc", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000846 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
847 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
848 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
849 rawio = self.MockRawIO([b"abc"])
850 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000851 self.assertEqual(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000852
Serhiy Storchaka1d19f972014-02-12 10:52:07 +0200853 def test_uninitialized(self):
854 bufio = self.tp.__new__(self.tp)
855 del bufio
856 bufio = self.tp.__new__(self.tp)
857 self.assertRaisesRegexp((ValueError, AttributeError),
858 'uninitialized|has no attribute',
859 bufio.read, 0)
860 bufio.__init__(self.MockRawIO())
861 self.assertEqual(bufio.read(0), b'')
862
Antoine Pitrou19690592009-06-12 20:14:08 +0000863 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000864 for arg in (None, 7):
865 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
866 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000867 self.assertEqual(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000868 # Invalid args
869 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000870
Antoine Pitrou19690592009-06-12 20:14:08 +0000871 def test_read1(self):
872 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
873 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000874 self.assertEqual(b"a", bufio.read(1))
875 self.assertEqual(b"b", bufio.read1(1))
876 self.assertEqual(rawio._reads, 1)
877 self.assertEqual(b"c", bufio.read1(100))
878 self.assertEqual(rawio._reads, 1)
879 self.assertEqual(b"d", bufio.read1(100))
880 self.assertEqual(rawio._reads, 2)
881 self.assertEqual(b"efg", bufio.read1(100))
882 self.assertEqual(rawio._reads, 3)
883 self.assertEqual(b"", bufio.read1(100))
884 self.assertEqual(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000885 # Invalid args
886 self.assertRaises(ValueError, bufio.read1, -1)
887
888 def test_readinto(self):
889 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
890 bufio = self.tp(rawio)
891 b = bytearray(2)
Ezio Melotti2623a372010-11-21 13:34:58 +0000892 self.assertEqual(bufio.readinto(b), 2)
893 self.assertEqual(b, b"ab")
894 self.assertEqual(bufio.readinto(b), 2)
895 self.assertEqual(b, b"cd")
896 self.assertEqual(bufio.readinto(b), 2)
897 self.assertEqual(b, b"ef")
898 self.assertEqual(bufio.readinto(b), 1)
899 self.assertEqual(b, b"gf")
900 self.assertEqual(bufio.readinto(b), 0)
901 self.assertEqual(b, b"gf")
Antoine Pitrou19690592009-06-12 20:14:08 +0000902
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000903 def test_readlines(self):
904 def bufio():
905 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
906 return self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000907 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
908 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
909 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000910
Antoine Pitrou19690592009-06-12 20:14:08 +0000911 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000912 data = b"abcdefghi"
913 dlen = len(data)
914
915 tests = [
916 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
917 [ 100, [ 3, 3, 3], [ dlen ] ],
918 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
919 ]
920
921 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000922 rawio = self.MockFileIO(data)
923 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000924 pos = 0
925 for nbytes in buf_read_sizes:
Ezio Melotti2623a372010-11-21 13:34:58 +0000926 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Christian Heimes1a6387e2008-03-26 12:49:49 +0000927 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000928 # this is mildly implementation-dependent
Ezio Melotti2623a372010-11-21 13:34:58 +0000929 self.assertEqual(rawio.read_history, raw_read_sizes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000930
Antoine Pitrou19690592009-06-12 20:14:08 +0000931 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000932 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000933 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
934 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000935 self.assertEqual(b"abcd", bufio.read(6))
936 self.assertEqual(b"e", bufio.read(1))
937 self.assertEqual(b"fg", bufio.read())
938 self.assertEqual(b"", bufio.peek(1))
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200939 self.assertIsNone(bufio.read())
Ezio Melotti2623a372010-11-21 13:34:58 +0000940 self.assertEqual(b"", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000941
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200942 rawio = self.MockRawIO((b"a", None, None))
943 self.assertEqual(b"a", rawio.readall())
944 self.assertIsNone(rawio.readall())
945
Antoine Pitrou19690592009-06-12 20:14:08 +0000946 def test_read_past_eof(self):
947 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
948 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000949
Ezio Melotti2623a372010-11-21 13:34:58 +0000950 self.assertEqual(b"abcdefg", bufio.read(9000))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000951
Antoine Pitrou19690592009-06-12 20:14:08 +0000952 def test_read_all(self):
953 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
954 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000955
Ezio Melotti2623a372010-11-21 13:34:58 +0000956 self.assertEqual(b"abcdefg", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000957
Victor Stinner6a102812010-04-27 23:55:59 +0000958 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +0000959 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +0000960 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000961 try:
962 # Write out many bytes with exactly the same number of 0's,
963 # 1's... 255's. This will help us check that concurrent reading
964 # doesn't duplicate or forget contents.
965 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000966 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000967 random.shuffle(l)
968 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000969 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000970 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000971 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000972 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000973 errors = []
974 results = []
975 def f():
976 try:
977 # Intra-buffer read then buffer-flushing read
978 for n in cycle([1, 19]):
979 s = bufio.read(n)
980 if not s:
981 break
982 # list.append() is atomic
983 results.append(s)
984 except Exception as e:
985 errors.append(e)
986 raise
987 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchakabd8c6292015-04-01 12:56:39 +0300988 with support.start_threads(threads):
989 time.sleep(0.02) # yield
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000990 self.assertFalse(errors,
991 "the following exceptions were caught: %r" % errors)
992 s = b''.join(results)
993 for i in range(256):
994 c = bytes(bytearray([i]))
995 self.assertEqual(s.count(c), N)
996 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000997 support.unlink(support.TESTFN)
998
999 def test_misbehaved_io(self):
1000 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1001 bufio = self.tp(rawio)
1002 self.assertRaises(IOError, bufio.seek, 0)
1003 self.assertRaises(IOError, bufio.tell)
1004
Antoine Pitroucb4f47c2010-08-11 13:40:17 +00001005 def test_no_extraneous_read(self):
1006 # Issue #9550; when the raw IO object has satisfied the read request,
1007 # we should not issue any additional reads, otherwise it may block
1008 # (e.g. socket).
1009 bufsize = 16
1010 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1011 rawio = self.MockRawIO([b"x" * n])
1012 bufio = self.tp(rawio, bufsize)
1013 self.assertEqual(bufio.read(n), b"x" * n)
1014 # Simple case: one raw read is enough to satisfy the request.
1015 self.assertEqual(rawio._extraneous_reads, 0,
1016 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1017 # A more complex case where two raw reads are needed to satisfy
1018 # the request.
1019 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1020 bufio = self.tp(rawio, bufsize)
1021 self.assertEqual(bufio.read(n), b"x" * n)
1022 self.assertEqual(rawio._extraneous_reads, 0,
1023 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1024
1025
Antoine Pitroubff5df02012-07-29 19:02:46 +02001026class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001027 tp = io.BufferedReader
1028
1029 def test_constructor(self):
1030 BufferedReaderTest.test_constructor(self)
1031 # The allocation can succeed on 32-bit builds, e.g. with more
1032 # than 2GB RAM and a 64-bit kernel.
1033 if sys.maxsize > 0x7FFFFFFF:
1034 rawio = self.MockRawIO()
1035 bufio = self.tp(rawio)
1036 self.assertRaises((OverflowError, MemoryError, ValueError),
1037 bufio.__init__, rawio, sys.maxsize)
1038
1039 def test_initialization(self):
1040 rawio = self.MockRawIO([b"abc"])
1041 bufio = self.tp(rawio)
1042 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1043 self.assertRaises(ValueError, bufio.read)
1044 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1045 self.assertRaises(ValueError, bufio.read)
1046 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1047 self.assertRaises(ValueError, bufio.read)
1048
1049 def test_misbehaved_io_read(self):
1050 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1051 bufio = self.tp(rawio)
1052 # _pyio.BufferedReader seems to implement reading different, so that
1053 # checking this is not so easy.
1054 self.assertRaises(IOError, bufio.read, 10)
1055
1056 def test_garbage_collection(self):
1057 # C BufferedReader objects are collected.
1058 # The Python version has __del__, so it ends into gc.garbage instead
1059 rawio = self.FileIO(support.TESTFN, "w+b")
1060 f = self.tp(rawio)
1061 f.f = f
1062 wr = weakref.ref(f)
1063 del f
1064 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001065 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00001066
R David Murray5b2cf5e2013-02-23 22:11:21 -05001067 def test_args_error(self):
1068 # Issue #17275
1069 with self.assertRaisesRegexp(TypeError, "BufferedReader"):
1070 self.tp(io.BytesIO(), 1024, 1024, 1024)
1071
1072
Antoine Pitrou19690592009-06-12 20:14:08 +00001073class PyBufferedReaderTest(BufferedReaderTest):
1074 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001075
1076
Antoine Pitrou19690592009-06-12 20:14:08 +00001077class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1078 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001079
Antoine Pitrou19690592009-06-12 20:14:08 +00001080 def test_constructor(self):
1081 rawio = self.MockRawIO()
1082 bufio = self.tp(rawio)
1083 bufio.__init__(rawio)
1084 bufio.__init__(rawio, buffer_size=1024)
1085 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +00001086 self.assertEqual(3, bufio.write(b"abc"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001087 bufio.flush()
1088 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1089 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1090 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1091 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +00001092 self.assertEqual(3, bufio.write(b"ghi"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001093 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001094 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001095
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001096 def test_uninitialized(self):
1097 bufio = self.tp.__new__(self.tp)
1098 del bufio
1099 bufio = self.tp.__new__(self.tp)
1100 self.assertRaisesRegexp((ValueError, AttributeError),
1101 'uninitialized|has no attribute',
1102 bufio.write, b'')
1103 bufio.__init__(self.MockRawIO())
1104 self.assertEqual(bufio.write(b''), 0)
1105
Antoine Pitrou19690592009-06-12 20:14:08 +00001106 def test_detach_flush(self):
1107 raw = self.MockRawIO()
1108 buf = self.tp(raw)
1109 buf.write(b"howdy!")
1110 self.assertFalse(raw._write_stack)
1111 buf.detach()
1112 self.assertEqual(raw._write_stack, [b"howdy!"])
1113
1114 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001115 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +00001116 writer = self.MockRawIO()
1117 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001118 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001119 self.assertFalse(writer._write_stack)
1120
Antoine Pitrou19690592009-06-12 20:14:08 +00001121 def test_write_overflow(self):
1122 writer = self.MockRawIO()
1123 bufio = self.tp(writer, 8)
1124 contents = b"abcdefghijklmnop"
1125 for n in range(0, len(contents), 3):
1126 bufio.write(contents[n:n+3])
1127 flushed = b"".join(writer._write_stack)
1128 # At least (total - 8) bytes were implicitly flushed, perhaps more
1129 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001130 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001131
Antoine Pitrou19690592009-06-12 20:14:08 +00001132 def check_writes(self, intermediate_func):
1133 # Lots of writes, test the flushed output is as expected.
1134 contents = bytes(range(256)) * 1000
1135 n = 0
1136 writer = self.MockRawIO()
1137 bufio = self.tp(writer, 13)
1138 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1139 def gen_sizes():
1140 for size in count(1):
1141 for i in range(15):
1142 yield size
1143 sizes = gen_sizes()
1144 while n < len(contents):
1145 size = min(next(sizes), len(contents) - n)
Ezio Melotti2623a372010-11-21 13:34:58 +00001146 self.assertEqual(bufio.write(contents[n:n+size]), size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001147 intermediate_func(bufio)
1148 n += size
1149 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001150 self.assertEqual(contents,
Antoine Pitrou19690592009-06-12 20:14:08 +00001151 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001152
Antoine Pitrou19690592009-06-12 20:14:08 +00001153 def test_writes(self):
1154 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001155
Antoine Pitrou19690592009-06-12 20:14:08 +00001156 def test_writes_and_flushes(self):
1157 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001158
Antoine Pitrou19690592009-06-12 20:14:08 +00001159 def test_writes_and_seeks(self):
1160 def _seekabs(bufio):
1161 pos = bufio.tell()
1162 bufio.seek(pos + 1, 0)
1163 bufio.seek(pos - 1, 0)
1164 bufio.seek(pos, 0)
1165 self.check_writes(_seekabs)
1166 def _seekrel(bufio):
1167 pos = bufio.seek(0, 1)
1168 bufio.seek(+1, 1)
1169 bufio.seek(-1, 1)
1170 bufio.seek(pos, 0)
1171 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001172
Antoine Pitrou19690592009-06-12 20:14:08 +00001173 def test_writes_and_truncates(self):
1174 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001175
Antoine Pitrou19690592009-06-12 20:14:08 +00001176 def test_write_non_blocking(self):
1177 raw = self.MockNonBlockWriterIO()
1178 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001179
Ezio Melotti2623a372010-11-21 13:34:58 +00001180 self.assertEqual(bufio.write(b"abcd"), 4)
1181 self.assertEqual(bufio.write(b"efghi"), 5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001182 # 1 byte will be written, the rest will be buffered
1183 raw.block_on(b"k")
Ezio Melotti2623a372010-11-21 13:34:58 +00001184 self.assertEqual(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001185
Antoine Pitrou19690592009-06-12 20:14:08 +00001186 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1187 raw.block_on(b"0")
1188 try:
1189 bufio.write(b"opqrwxyz0123456789")
1190 except self.BlockingIOError as e:
1191 written = e.characters_written
1192 else:
1193 self.fail("BlockingIOError should have been raised")
Ezio Melotti2623a372010-11-21 13:34:58 +00001194 self.assertEqual(written, 16)
1195 self.assertEqual(raw.pop_written(),
Antoine Pitrou19690592009-06-12 20:14:08 +00001196 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001197
Ezio Melotti2623a372010-11-21 13:34:58 +00001198 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Antoine Pitrou19690592009-06-12 20:14:08 +00001199 s = raw.pop_written()
1200 # Previously buffered bytes were flushed
1201 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001202
Antoine Pitrou19690592009-06-12 20:14:08 +00001203 def test_write_and_rewind(self):
1204 raw = io.BytesIO()
1205 bufio = self.tp(raw, 4)
1206 self.assertEqual(bufio.write(b"abcdef"), 6)
1207 self.assertEqual(bufio.tell(), 6)
1208 bufio.seek(0, 0)
1209 self.assertEqual(bufio.write(b"XY"), 2)
1210 bufio.seek(6, 0)
1211 self.assertEqual(raw.getvalue(), b"XYcdef")
1212 self.assertEqual(bufio.write(b"123456"), 6)
1213 bufio.flush()
1214 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001215
Antoine Pitrou19690592009-06-12 20:14:08 +00001216 def test_flush(self):
1217 writer = self.MockRawIO()
1218 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001219 bufio.write(b"abc")
1220 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001221 self.assertEqual(b"abc", writer._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001222
Antoine Pitrou78e761e2012-10-16 22:57:11 +02001223 def test_writelines(self):
1224 l = [b'ab', b'cd', b'ef']
1225 writer = self.MockRawIO()
1226 bufio = self.tp(writer, 8)
1227 bufio.writelines(l)
1228 bufio.flush()
1229 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1230
1231 def test_writelines_userlist(self):
1232 l = UserList([b'ab', b'cd', b'ef'])
1233 writer = self.MockRawIO()
1234 bufio = self.tp(writer, 8)
1235 bufio.writelines(l)
1236 bufio.flush()
1237 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1238
1239 def test_writelines_error(self):
1240 writer = self.MockRawIO()
1241 bufio = self.tp(writer, 8)
1242 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1243 self.assertRaises(TypeError, bufio.writelines, None)
1244
Antoine Pitrou19690592009-06-12 20:14:08 +00001245 def test_destructor(self):
1246 writer = self.MockRawIO()
1247 bufio = self.tp(writer, 8)
1248 bufio.write(b"abc")
1249 del bufio
1250 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001251 self.assertEqual(b"abc", writer._write_stack[0])
Antoine Pitrou19690592009-06-12 20:14:08 +00001252
1253 def test_truncate(self):
1254 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001255 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001256 bufio = self.tp(raw, 8)
1257 bufio.write(b"abcdef")
1258 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001259 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001260 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001261 self.assertEqual(f.read(), b"abc")
1262
Victor Stinner6a102812010-04-27 23:55:59 +00001263 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001264 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001265 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001266 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001267 # Write out many bytes from many threads and test they were
1268 # all flushed.
1269 N = 1000
1270 contents = bytes(range(256)) * N
1271 sizes = cycle([1, 19])
1272 n = 0
1273 queue = deque()
1274 while n < len(contents):
1275 size = next(sizes)
1276 queue.append(contents[n:n+size])
1277 n += size
1278 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001279 # We use a real file object because it allows us to
1280 # exercise situations where the GIL is released before
1281 # writing the buffer to the raw streams. This is in addition
1282 # to concurrency issues due to switching threads in the middle
1283 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001284 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001285 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001286 errors = []
1287 def f():
1288 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001289 while True:
1290 try:
1291 s = queue.popleft()
1292 except IndexError:
1293 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001294 bufio.write(s)
1295 except Exception as e:
1296 errors.append(e)
1297 raise
1298 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03001299 with support.start_threads(threads):
1300 time.sleep(0.02) # yield
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001301 self.assertFalse(errors,
1302 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001303 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001304 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001305 s = f.read()
1306 for i in range(256):
Ezio Melotti2623a372010-11-21 13:34:58 +00001307 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001308 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001309 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001310
Antoine Pitrou19690592009-06-12 20:14:08 +00001311 def test_misbehaved_io(self):
1312 rawio = self.MisbehavedRawIO()
1313 bufio = self.tp(rawio, 5)
1314 self.assertRaises(IOError, bufio.seek, 0)
1315 self.assertRaises(IOError, bufio.tell)
1316 self.assertRaises(IOError, bufio.write, b"abcdef")
1317
1318 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001319 with support.check_warnings(("max_buffer_size is deprecated",
1320 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001321 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001322
Benjamin Petersona2d6d712012-12-20 12:24:10 -06001323 def test_write_error_on_close(self):
1324 raw = self.MockRawIO()
1325 def bad_write(b):
1326 raise IOError()
1327 raw.write = bad_write
1328 b = self.tp(raw)
1329 b.write(b'spam')
1330 self.assertRaises(IOError, b.close) # exception not swallowed
1331 self.assertTrue(b.closed)
1332
Antoine Pitrou19690592009-06-12 20:14:08 +00001333
Antoine Pitroubff5df02012-07-29 19:02:46 +02001334class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001335 tp = io.BufferedWriter
1336
1337 def test_constructor(self):
1338 BufferedWriterTest.test_constructor(self)
1339 # The allocation can succeed on 32-bit builds, e.g. with more
1340 # than 2GB RAM and a 64-bit kernel.
1341 if sys.maxsize > 0x7FFFFFFF:
1342 rawio = self.MockRawIO()
1343 bufio = self.tp(rawio)
1344 self.assertRaises((OverflowError, MemoryError, ValueError),
1345 bufio.__init__, rawio, sys.maxsize)
1346
1347 def test_initialization(self):
1348 rawio = self.MockRawIO()
1349 bufio = self.tp(rawio)
1350 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1351 self.assertRaises(ValueError, bufio.write, b"def")
1352 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1353 self.assertRaises(ValueError, bufio.write, b"def")
1354 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1355 self.assertRaises(ValueError, bufio.write, b"def")
1356
1357 def test_garbage_collection(self):
1358 # C BufferedWriter objects are collected, and collecting them flushes
1359 # all data to disk.
1360 # The Python version has __del__, so it ends into gc.garbage instead
1361 rawio = self.FileIO(support.TESTFN, "w+b")
1362 f = self.tp(rawio)
1363 f.write(b"123xxx")
1364 f.x = f
1365 wr = weakref.ref(f)
1366 del f
1367 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001368 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001369 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001370 self.assertEqual(f.read(), b"123xxx")
1371
R David Murray5b2cf5e2013-02-23 22:11:21 -05001372 def test_args_error(self):
1373 # Issue #17275
1374 with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
1375 self.tp(io.BytesIO(), 1024, 1024, 1024)
1376
Antoine Pitrou19690592009-06-12 20:14:08 +00001377
1378class PyBufferedWriterTest(BufferedWriterTest):
1379 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001380
1381class BufferedRWPairTest(unittest.TestCase):
1382
Antoine Pitrou19690592009-06-12 20:14:08 +00001383 def test_constructor(self):
1384 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001385 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001386
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001387 def test_uninitialized(self):
1388 pair = self.tp.__new__(self.tp)
1389 del pair
1390 pair = self.tp.__new__(self.tp)
1391 self.assertRaisesRegexp((ValueError, AttributeError),
1392 'uninitialized|has no attribute',
1393 pair.read, 0)
1394 self.assertRaisesRegexp((ValueError, AttributeError),
1395 'uninitialized|has no attribute',
1396 pair.write, b'')
1397 pair.__init__(self.MockRawIO(), self.MockRawIO())
1398 self.assertEqual(pair.read(0), b'')
1399 self.assertEqual(pair.write(b''), 0)
1400
Antoine Pitrou19690592009-06-12 20:14:08 +00001401 def test_detach(self):
1402 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1403 self.assertRaises(self.UnsupportedOperation, pair.detach)
1404
1405 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001406 with support.check_warnings(("max_buffer_size is deprecated",
1407 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001408 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001409
1410 def test_constructor_with_not_readable(self):
1411 class NotReadable(MockRawIO):
1412 def readable(self):
1413 return False
1414
1415 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1416
1417 def test_constructor_with_not_writeable(self):
1418 class NotWriteable(MockRawIO):
1419 def writable(self):
1420 return False
1421
1422 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1423
1424 def test_read(self):
1425 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1426
1427 self.assertEqual(pair.read(3), b"abc")
1428 self.assertEqual(pair.read(1), b"d")
1429 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001430 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1431 self.assertEqual(pair.read(None), b"abc")
1432
1433 def test_readlines(self):
1434 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1435 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1436 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1437 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001438
1439 def test_read1(self):
1440 # .read1() is delegated to the underlying reader object, so this test
1441 # can be shallow.
1442 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1443
1444 self.assertEqual(pair.read1(3), b"abc")
1445
1446 def test_readinto(self):
1447 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1448
1449 data = bytearray(5)
1450 self.assertEqual(pair.readinto(data), 5)
1451 self.assertEqual(data, b"abcde")
1452
1453 def test_write(self):
1454 w = self.MockRawIO()
1455 pair = self.tp(self.MockRawIO(), w)
1456
1457 pair.write(b"abc")
1458 pair.flush()
1459 pair.write(b"def")
1460 pair.flush()
1461 self.assertEqual(w._write_stack, [b"abc", b"def"])
1462
1463 def test_peek(self):
1464 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1465
1466 self.assertTrue(pair.peek(3).startswith(b"abc"))
1467 self.assertEqual(pair.read(3), b"abc")
1468
1469 def test_readable(self):
1470 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1471 self.assertTrue(pair.readable())
1472
1473 def test_writeable(self):
1474 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1475 self.assertTrue(pair.writable())
1476
1477 def test_seekable(self):
1478 # BufferedRWPairs are never seekable, even if their readers and writers
1479 # are.
1480 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1481 self.assertFalse(pair.seekable())
1482
1483 # .flush() is delegated to the underlying writer object and has been
1484 # tested in the test_write method.
1485
1486 def test_close_and_closed(self):
1487 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1488 self.assertFalse(pair.closed)
1489 pair.close()
1490 self.assertTrue(pair.closed)
1491
Serhiy Storchakaf95a57f2015-03-24 23:23:42 +02001492 def test_reader_close_error_on_close(self):
1493 def reader_close():
1494 reader_non_existing
1495 reader = self.MockRawIO()
1496 reader.close = reader_close
1497 writer = self.MockRawIO()
1498 pair = self.tp(reader, writer)
1499 with self.assertRaises(NameError) as err:
1500 pair.close()
1501 self.assertIn('reader_non_existing', str(err.exception))
1502 self.assertTrue(pair.closed)
1503 self.assertFalse(reader.closed)
1504 self.assertTrue(writer.closed)
1505
1506 def test_writer_close_error_on_close(self):
1507 def writer_close():
1508 writer_non_existing
1509 reader = self.MockRawIO()
1510 writer = self.MockRawIO()
1511 writer.close = writer_close
1512 pair = self.tp(reader, writer)
1513 with self.assertRaises(NameError) as err:
1514 pair.close()
1515 self.assertIn('writer_non_existing', str(err.exception))
1516 self.assertFalse(pair.closed)
1517 self.assertTrue(reader.closed)
1518 self.assertFalse(writer.closed)
1519
1520 def test_reader_writer_close_error_on_close(self):
1521 def reader_close():
1522 reader_non_existing
1523 def writer_close():
1524 writer_non_existing
1525 reader = self.MockRawIO()
1526 reader.close = reader_close
1527 writer = self.MockRawIO()
1528 writer.close = writer_close
1529 pair = self.tp(reader, writer)
1530 with self.assertRaises(NameError) as err:
1531 pair.close()
1532 self.assertIn('reader_non_existing', str(err.exception))
1533 self.assertFalse(pair.closed)
1534 self.assertFalse(reader.closed)
1535 self.assertFalse(writer.closed)
1536
Antoine Pitrou19690592009-06-12 20:14:08 +00001537 def test_isatty(self):
1538 class SelectableIsAtty(MockRawIO):
1539 def __init__(self, isatty):
1540 MockRawIO.__init__(self)
1541 self._isatty = isatty
1542
1543 def isatty(self):
1544 return self._isatty
1545
1546 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1547 self.assertFalse(pair.isatty())
1548
1549 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1550 self.assertTrue(pair.isatty())
1551
1552 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1553 self.assertTrue(pair.isatty())
1554
1555 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1556 self.assertTrue(pair.isatty())
1557
Benjamin Peterson1c873bf2014-09-29 22:46:57 -04001558 def test_weakref_clearing(self):
1559 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1560 ref = weakref.ref(brw)
1561 brw = None
1562 ref = None # Shouldn't segfault.
1563
Antoine Pitrou19690592009-06-12 20:14:08 +00001564class CBufferedRWPairTest(BufferedRWPairTest):
1565 tp = io.BufferedRWPair
1566
1567class PyBufferedRWPairTest(BufferedRWPairTest):
1568 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001569
1570
Antoine Pitrou19690592009-06-12 20:14:08 +00001571class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1572 read_mode = "rb+"
1573 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001574
Antoine Pitrou19690592009-06-12 20:14:08 +00001575 def test_constructor(self):
1576 BufferedReaderTest.test_constructor(self)
1577 BufferedWriterTest.test_constructor(self)
1578
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001579 def test_uninitialized(self):
1580 BufferedReaderTest.test_uninitialized(self)
1581 BufferedWriterTest.test_uninitialized(self)
1582
Antoine Pitrou19690592009-06-12 20:14:08 +00001583 def test_read_and_write(self):
1584 raw = self.MockRawIO((b"asdf", b"ghjk"))
1585 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001586
1587 self.assertEqual(b"as", rw.read(2))
1588 rw.write(b"ddd")
1589 rw.write(b"eee")
1590 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001591 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001592 self.assertEqual(b"dddeee", raw._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001593
Antoine Pitrou19690592009-06-12 20:14:08 +00001594 def test_seek_and_tell(self):
1595 raw = self.BytesIO(b"asdfghjkl")
1596 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001597
Ezio Melotti2623a372010-11-21 13:34:58 +00001598 self.assertEqual(b"as", rw.read(2))
1599 self.assertEqual(2, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001600 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001601 self.assertEqual(b"asdf", rw.read(4))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001602
Antoine Pitrou808cec52011-08-20 15:40:58 +02001603 rw.write(b"123f")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001604 rw.seek(0, 0)
Antoine Pitrou808cec52011-08-20 15:40:58 +02001605 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001606 self.assertEqual(9, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001607 rw.seek(-4, 2)
Ezio Melotti2623a372010-11-21 13:34:58 +00001608 self.assertEqual(5, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001609 rw.seek(2, 1)
Ezio Melotti2623a372010-11-21 13:34:58 +00001610 self.assertEqual(7, rw.tell())
1611 self.assertEqual(b"fl", rw.read(11))
Antoine Pitrou808cec52011-08-20 15:40:58 +02001612 rw.flush()
1613 self.assertEqual(b"asdf123fl", raw.getvalue())
1614
Christian Heimes1a6387e2008-03-26 12:49:49 +00001615 self.assertRaises(TypeError, rw.seek, 0.0)
1616
Antoine Pitrou19690592009-06-12 20:14:08 +00001617 def check_flush_and_read(self, read_func):
1618 raw = self.BytesIO(b"abcdefghi")
1619 bufio = self.tp(raw)
1620
Ezio Melotti2623a372010-11-21 13:34:58 +00001621 self.assertEqual(b"ab", read_func(bufio, 2))
Antoine Pitrou19690592009-06-12 20:14:08 +00001622 bufio.write(b"12")
Ezio Melotti2623a372010-11-21 13:34:58 +00001623 self.assertEqual(b"ef", read_func(bufio, 2))
1624 self.assertEqual(6, bufio.tell())
Antoine Pitrou19690592009-06-12 20:14:08 +00001625 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001626 self.assertEqual(6, bufio.tell())
1627 self.assertEqual(b"ghi", read_func(bufio))
Antoine Pitrou19690592009-06-12 20:14:08 +00001628 raw.seek(0, 0)
1629 raw.write(b"XYZ")
1630 # flush() resets the read buffer
1631 bufio.flush()
1632 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001633 self.assertEqual(b"XYZ", read_func(bufio, 3))
Antoine Pitrou19690592009-06-12 20:14:08 +00001634
1635 def test_flush_and_read(self):
1636 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1637
1638 def test_flush_and_readinto(self):
1639 def _readinto(bufio, n=-1):
1640 b = bytearray(n if n >= 0 else 9999)
1641 n = bufio.readinto(b)
1642 return bytes(b[:n])
1643 self.check_flush_and_read(_readinto)
1644
1645 def test_flush_and_peek(self):
1646 def _peek(bufio, n=-1):
1647 # This relies on the fact that the buffer can contain the whole
1648 # raw stream, otherwise peek() can return less.
1649 b = bufio.peek(n)
1650 if n != -1:
1651 b = b[:n]
1652 bufio.seek(len(b), 1)
1653 return b
1654 self.check_flush_and_read(_peek)
1655
1656 def test_flush_and_write(self):
1657 raw = self.BytesIO(b"abcdefghi")
1658 bufio = self.tp(raw)
1659
1660 bufio.write(b"123")
1661 bufio.flush()
1662 bufio.write(b"45")
1663 bufio.flush()
1664 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001665 self.assertEqual(b"12345fghi", raw.getvalue())
1666 self.assertEqual(b"12345fghi", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +00001667
1668 def test_threads(self):
1669 BufferedReaderTest.test_threads(self)
1670 BufferedWriterTest.test_threads(self)
1671
1672 def test_writes_and_peek(self):
1673 def _peek(bufio):
1674 bufio.peek(1)
1675 self.check_writes(_peek)
1676 def _peek(bufio):
1677 pos = bufio.tell()
1678 bufio.seek(-1, 1)
1679 bufio.peek(1)
1680 bufio.seek(pos, 0)
1681 self.check_writes(_peek)
1682
1683 def test_writes_and_reads(self):
1684 def _read(bufio):
1685 bufio.seek(-1, 1)
1686 bufio.read(1)
1687 self.check_writes(_read)
1688
1689 def test_writes_and_read1s(self):
1690 def _read1(bufio):
1691 bufio.seek(-1, 1)
1692 bufio.read1(1)
1693 self.check_writes(_read1)
1694
1695 def test_writes_and_readintos(self):
1696 def _read(bufio):
1697 bufio.seek(-1, 1)
1698 bufio.readinto(bytearray(1))
1699 self.check_writes(_read)
1700
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001701 def test_write_after_readahead(self):
1702 # Issue #6629: writing after the buffer was filled by readahead should
1703 # first rewind the raw stream.
1704 for overwrite_size in [1, 5]:
1705 raw = self.BytesIO(b"A" * 10)
1706 bufio = self.tp(raw, 4)
1707 # Trigger readahead
1708 self.assertEqual(bufio.read(1), b"A")
1709 self.assertEqual(bufio.tell(), 1)
1710 # Overwriting should rewind the raw stream if it needs so
1711 bufio.write(b"B" * overwrite_size)
1712 self.assertEqual(bufio.tell(), overwrite_size + 1)
1713 # If the write size was smaller than the buffer size, flush() and
1714 # check that rewind happens.
1715 bufio.flush()
1716 self.assertEqual(bufio.tell(), overwrite_size + 1)
1717 s = raw.getvalue()
1718 self.assertEqual(s,
1719 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1720
Antoine Pitrouee46a7b2011-05-13 00:31:52 +02001721 def test_write_rewind_write(self):
1722 # Various combinations of reading / writing / seeking backwards / writing again
1723 def mutate(bufio, pos1, pos2):
1724 assert pos2 >= pos1
1725 # Fill the buffer
1726 bufio.seek(pos1)
1727 bufio.read(pos2 - pos1)
1728 bufio.write(b'\x02')
1729 # This writes earlier than the previous write, but still inside
1730 # the buffer.
1731 bufio.seek(pos1)
1732 bufio.write(b'\x01')
1733
1734 b = b"\x80\x81\x82\x83\x84"
1735 for i in range(0, len(b)):
1736 for j in range(i, len(b)):
1737 raw = self.BytesIO(b)
1738 bufio = self.tp(raw, 100)
1739 mutate(bufio, i, j)
1740 bufio.flush()
1741 expected = bytearray(b)
1742 expected[j] = 2
1743 expected[i] = 1
1744 self.assertEqual(raw.getvalue(), expected,
1745 "failed result for i=%d, j=%d" % (i, j))
1746
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001747 def test_truncate_after_read_or_write(self):
1748 raw = self.BytesIO(b"A" * 10)
1749 bufio = self.tp(raw, 100)
1750 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1751 self.assertEqual(bufio.truncate(), 2)
1752 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1753 self.assertEqual(bufio.truncate(), 4)
1754
Antoine Pitrou19690592009-06-12 20:14:08 +00001755 def test_misbehaved_io(self):
1756 BufferedReaderTest.test_misbehaved_io(self)
1757 BufferedWriterTest.test_misbehaved_io(self)
1758
Antoine Pitrou808cec52011-08-20 15:40:58 +02001759 def test_interleaved_read_write(self):
1760 # Test for issue #12213
1761 with self.BytesIO(b'abcdefgh') as raw:
1762 with self.tp(raw, 100) as f:
1763 f.write(b"1")
1764 self.assertEqual(f.read(1), b'b')
1765 f.write(b'2')
1766 self.assertEqual(f.read1(1), b'd')
1767 f.write(b'3')
1768 buf = bytearray(1)
1769 f.readinto(buf)
1770 self.assertEqual(buf, b'f')
1771 f.write(b'4')
1772 self.assertEqual(f.peek(1), b'h')
1773 f.flush()
1774 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1775
1776 with self.BytesIO(b'abc') as raw:
1777 with self.tp(raw, 100) as f:
1778 self.assertEqual(f.read(1), b'a')
1779 f.write(b"2")
1780 self.assertEqual(f.read(1), b'c')
1781 f.flush()
1782 self.assertEqual(raw.getvalue(), b'a2c')
1783
1784 def test_interleaved_readline_write(self):
1785 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1786 with self.tp(raw) as f:
1787 f.write(b'1')
1788 self.assertEqual(f.readline(), b'b\n')
1789 f.write(b'2')
1790 self.assertEqual(f.readline(), b'def\n')
1791 f.write(b'3')
1792 self.assertEqual(f.readline(), b'\n')
1793 f.flush()
1794 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1795
R David Murray5b2cf5e2013-02-23 22:11:21 -05001796
Antoine Pitroubff5df02012-07-29 19:02:46 +02001797class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
1798 BufferedRandomTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001799 tp = io.BufferedRandom
1800
1801 def test_constructor(self):
1802 BufferedRandomTest.test_constructor(self)
1803 # The allocation can succeed on 32-bit builds, e.g. with more
1804 # than 2GB RAM and a 64-bit kernel.
1805 if sys.maxsize > 0x7FFFFFFF:
1806 rawio = self.MockRawIO()
1807 bufio = self.tp(rawio)
1808 self.assertRaises((OverflowError, MemoryError, ValueError),
1809 bufio.__init__, rawio, sys.maxsize)
1810
1811 def test_garbage_collection(self):
1812 CBufferedReaderTest.test_garbage_collection(self)
1813 CBufferedWriterTest.test_garbage_collection(self)
1814
R David Murray5b2cf5e2013-02-23 22:11:21 -05001815 def test_args_error(self):
1816 # Issue #17275
1817 with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
1818 self.tp(io.BytesIO(), 1024, 1024, 1024)
1819
1820
Antoine Pitrou19690592009-06-12 20:14:08 +00001821class PyBufferedRandomTest(BufferedRandomTest):
1822 tp = pyio.BufferedRandom
1823
1824
Christian Heimes1a6387e2008-03-26 12:49:49 +00001825# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1826# properties:
1827# - A single output character can correspond to many bytes of input.
1828# - The number of input bytes to complete the character can be
1829# undetermined until the last input byte is received.
1830# - The number of input bytes can vary depending on previous input.
1831# - A single input byte can correspond to many characters of output.
1832# - The number of output characters can be undetermined until the
1833# last input byte is received.
1834# - The number of output characters can vary depending on previous input.
1835
1836class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1837 """
1838 For testing seek/tell behavior with a stateful, buffering decoder.
1839
1840 Input is a sequence of words. Words may be fixed-length (length set
1841 by input) or variable-length (period-terminated). In variable-length
1842 mode, extra periods are ignored. Possible words are:
1843 - 'i' followed by a number sets the input length, I (maximum 99).
1844 When I is set to 0, words are space-terminated.
1845 - 'o' followed by a number sets the output length, O (maximum 99).
1846 - Any other word is converted into a word followed by a period on
1847 the output. The output word consists of the input word truncated
1848 or padded out with hyphens to make its length equal to O. If O
1849 is 0, the word is output verbatim without truncating or padding.
1850 I and O are initially set to 1. When I changes, any buffered input is
1851 re-scanned according to the new I. EOF also terminates the last word.
1852 """
1853
1854 def __init__(self, errors='strict'):
1855 codecs.IncrementalDecoder.__init__(self, errors)
1856 self.reset()
1857
1858 def __repr__(self):
1859 return '<SID %x>' % id(self)
1860
1861 def reset(self):
1862 self.i = 1
1863 self.o = 1
1864 self.buffer = bytearray()
1865
1866 def getstate(self):
1867 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1868 return bytes(self.buffer), i*100 + o
1869
1870 def setstate(self, state):
1871 buffer, io = state
1872 self.buffer = bytearray(buffer)
1873 i, o = divmod(io, 100)
1874 self.i, self.o = i ^ 1, o ^ 1
1875
1876 def decode(self, input, final=False):
1877 output = ''
1878 for b in input:
1879 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001880 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001881 if self.buffer:
1882 output += self.process_word()
1883 else:
1884 self.buffer.append(b)
1885 else: # fixed-length, terminate after self.i bytes
1886 self.buffer.append(b)
1887 if len(self.buffer) == self.i:
1888 output += self.process_word()
1889 if final and self.buffer: # EOF terminates the last word
1890 output += self.process_word()
1891 return output
1892
1893 def process_word(self):
1894 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001895 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001896 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001897 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001898 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1899 else:
1900 output = self.buffer.decode('ascii')
1901 if len(output) < self.o:
1902 output += '-'*self.o # pad out with hyphens
1903 if self.o:
1904 output = output[:self.o] # truncate to output length
1905 output += '.'
1906 self.buffer = bytearray()
1907 return output
1908
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001909 codecEnabled = False
1910
1911 @classmethod
1912 def lookupTestDecoder(cls, name):
1913 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001914 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001915 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001916 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001917 incrementalencoder=None,
1918 streamreader=None, streamwriter=None,
1919 incrementaldecoder=cls)
1920
1921# Register the previous decoder for testing.
1922# Disabled by default, tests will enable it.
1923codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1924
1925
Christian Heimes1a6387e2008-03-26 12:49:49 +00001926class StatefulIncrementalDecoderTest(unittest.TestCase):
1927 """
1928 Make sure the StatefulIncrementalDecoder actually works.
1929 """
1930
1931 test_cases = [
1932 # I=1, O=1 (fixed-length input == fixed-length output)
1933 (b'abcd', False, 'a.b.c.d.'),
1934 # I=0, O=0 (variable-length input, variable-length output)
1935 (b'oiabcd', True, 'abcd.'),
1936 # I=0, O=0 (should ignore extra periods)
1937 (b'oi...abcd...', True, 'abcd.'),
1938 # I=0, O=6 (variable-length input, fixed-length output)
1939 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1940 # I=2, O=6 (fixed-length input < fixed-length output)
1941 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1942 # I=6, O=3 (fixed-length input > fixed-length output)
1943 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1944 # I=0, then 3; O=29, then 15 (with longer output)
1945 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1946 'a----------------------------.' +
1947 'b----------------------------.' +
1948 'cde--------------------------.' +
1949 'abcdefghijabcde.' +
1950 'a.b------------.' +
1951 '.c.------------.' +
1952 'd.e------------.' +
1953 'k--------------.' +
1954 'l--------------.' +
1955 'm--------------.')
1956 ]
1957
Antoine Pitrou19690592009-06-12 20:14:08 +00001958 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001959 # Try a few one-shot test cases.
1960 for input, eof, output in self.test_cases:
1961 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001962 self.assertEqual(d.decode(input, eof), output)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001963
1964 # Also test an unfinished decode, followed by forcing EOF.
1965 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001966 self.assertEqual(d.decode(b'oiabcd'), '')
1967 self.assertEqual(d.decode(b'', 1), 'abcd.')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001968
1969class TextIOWrapperTest(unittest.TestCase):
1970
1971 def setUp(self):
1972 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1973 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001974 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001975
1976 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001977 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001978
Antoine Pitrou19690592009-06-12 20:14:08 +00001979 def test_constructor(self):
1980 r = self.BytesIO(b"\xc3\xa9\n\n")
1981 b = self.BufferedReader(r, 1000)
1982 t = self.TextIOWrapper(b)
1983 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti2623a372010-11-21 13:34:58 +00001984 self.assertEqual(t.encoding, "latin1")
1985 self.assertEqual(t.line_buffering, False)
Antoine Pitrou19690592009-06-12 20:14:08 +00001986 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti2623a372010-11-21 13:34:58 +00001987 self.assertEqual(t.encoding, "utf8")
1988 self.assertEqual(t.line_buffering, True)
1989 self.assertEqual("\xe9\n", t.readline())
Antoine Pitrou19690592009-06-12 20:14:08 +00001990 self.assertRaises(TypeError, t.__init__, b, newline=42)
1991 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1992
Serhiy Storchakaabb7e652015-04-16 11:56:35 +03001993 def test_uninitialized(self):
1994 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
1995 del t
1996 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
1997 self.assertRaises(Exception, repr, t)
1998 self.assertRaisesRegexp((ValueError, AttributeError),
1999 'uninitialized|has no attribute',
2000 t.read, 0)
2001 t.__init__(self.MockRawIO())
2002 self.assertEqual(t.read(0), u'')
2003
Antoine Pitrou19690592009-06-12 20:14:08 +00002004 def test_detach(self):
2005 r = self.BytesIO()
2006 b = self.BufferedWriter(r)
2007 t = self.TextIOWrapper(b)
2008 self.assertIs(t.detach(), b)
2009
2010 t = self.TextIOWrapper(b, encoding="ascii")
2011 t.write("howdy")
2012 self.assertFalse(r.getvalue())
2013 t.detach()
2014 self.assertEqual(r.getvalue(), b"howdy")
2015 self.assertRaises(ValueError, t.detach)
2016
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002017 # Operations independent of the detached stream should still work
2018 repr(t)
2019 self.assertEqual(t.encoding, "ascii")
2020 self.assertEqual(t.errors, "strict")
2021 self.assertFalse(t.line_buffering)
2022
Antoine Pitrou19690592009-06-12 20:14:08 +00002023 def test_repr(self):
2024 raw = self.BytesIO("hello".encode("utf-8"))
2025 b = self.BufferedReader(raw)
2026 t = self.TextIOWrapper(b, encoding="utf-8")
2027 modname = self.TextIOWrapper.__module__
2028 self.assertEqual(repr(t),
2029 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2030 raw.name = "dummy"
2031 self.assertEqual(repr(t),
2032 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
2033 raw.name = b"dummy"
2034 self.assertEqual(repr(t),
2035 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
2036
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002037 t.buffer.detach()
2038 repr(t) # Should not raise an exception
2039
Antoine Pitrou19690592009-06-12 20:14:08 +00002040 def test_line_buffering(self):
2041 r = self.BytesIO()
2042 b = self.BufferedWriter(r, 1000)
2043 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
2044 t.write("X")
Ezio Melotti2623a372010-11-21 13:34:58 +00002045 self.assertEqual(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00002046 t.write("Y\nZ")
Ezio Melotti2623a372010-11-21 13:34:58 +00002047 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00002048 t.write("A\rB")
Ezio Melotti2623a372010-11-21 13:34:58 +00002049 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002050
Antoine Pitrou19690592009-06-12 20:14:08 +00002051 def test_encoding(self):
2052 # Check the encoding attribute is always set, and valid
2053 b = self.BytesIO()
2054 t = self.TextIOWrapper(b, encoding="utf8")
2055 self.assertEqual(t.encoding, "utf8")
2056 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002057 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002058 codecs.lookup(t.encoding)
2059
2060 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002061 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00002062 b = self.BytesIO(b"abc\n\xff\n")
2063 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002064 self.assertRaises(UnicodeError, t.read)
2065 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00002066 b = self.BytesIO(b"abc\n\xff\n")
2067 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002068 self.assertRaises(UnicodeError, t.read)
2069 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00002070 b = self.BytesIO(b"abc\n\xff\n")
2071 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti2623a372010-11-21 13:34:58 +00002072 self.assertEqual(t.read(), "abc\n\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002073 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00002074 b = self.BytesIO(b"abc\n\xff\n")
2075 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti2623a372010-11-21 13:34:58 +00002076 self.assertEqual(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002077
Antoine Pitrou19690592009-06-12 20:14:08 +00002078 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002079 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00002080 b = self.BytesIO()
2081 t = self.TextIOWrapper(b, encoding="ascii")
2082 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002083 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00002084 b = self.BytesIO()
2085 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2086 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002087 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00002088 b = self.BytesIO()
2089 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00002090 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00002091 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002092 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002093 self.assertEqual(b.getvalue(), b"abcdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002094 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00002095 b = self.BytesIO()
2096 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00002097 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00002098 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002099 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002100 self.assertEqual(b.getvalue(), b"abc?def\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002101
Antoine Pitrou19690592009-06-12 20:14:08 +00002102 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002103 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2104
2105 tests = [
2106 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
2107 [ '', input_lines ],
2108 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2109 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2110 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
2111 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002112 encodings = (
2113 'utf-8', 'latin-1',
2114 'utf-16', 'utf-16-le', 'utf-16-be',
2115 'utf-32', 'utf-32-le', 'utf-32-be',
2116 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00002117
2118 # Try a range of buffer sizes to test the case where \r is the last
2119 # character in TextIOWrapper._pending_line.
2120 for encoding in encodings:
2121 # XXX: str.encode() should return bytes
2122 data = bytes(''.join(input_lines).encode(encoding))
2123 for do_reads in (False, True):
2124 for bufsize in range(1, 10):
2125 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002126 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2127 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00002128 encoding=encoding)
2129 if do_reads:
2130 got_lines = []
2131 while True:
2132 c2 = textio.read(2)
2133 if c2 == '':
2134 break
Ezio Melotti2623a372010-11-21 13:34:58 +00002135 self.assertEqual(len(c2), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002136 got_lines.append(c2 + textio.readline())
2137 else:
2138 got_lines = list(textio)
2139
2140 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti2623a372010-11-21 13:34:58 +00002141 self.assertEqual(got_line, exp_line)
2142 self.assertEqual(len(got_lines), len(exp_lines))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002143
Antoine Pitrou19690592009-06-12 20:14:08 +00002144 def test_newlines_input(self):
2145 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002146 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2147 for newline, expected in [
2148 (None, normalized.decode("ascii").splitlines(True)),
2149 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00002150 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2151 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2152 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00002153 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00002154 buf = self.BytesIO(testdata)
2155 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti2623a372010-11-21 13:34:58 +00002156 self.assertEqual(txt.readlines(), expected)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002157 txt.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002158 self.assertEqual(txt.read(), "".join(expected))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002159
Antoine Pitrou19690592009-06-12 20:14:08 +00002160 def test_newlines_output(self):
2161 testdict = {
2162 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2163 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2164 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2165 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2166 }
2167 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2168 for newline, expected in tests:
2169 buf = self.BytesIO()
2170 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2171 txt.write("AAA\nB")
2172 txt.write("BB\nCCC\n")
2173 txt.write("X\rY\r\nZ")
2174 txt.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002175 self.assertEqual(buf.closed, False)
2176 self.assertEqual(buf.getvalue(), expected)
Antoine Pitrou19690592009-06-12 20:14:08 +00002177
2178 def test_destructor(self):
2179 l = []
2180 base = self.BytesIO
2181 class MyBytesIO(base):
2182 def close(self):
2183 l.append(self.getvalue())
2184 base.close(self)
2185 b = MyBytesIO()
2186 t = self.TextIOWrapper(b, encoding="ascii")
2187 t.write("abc")
2188 del t
2189 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00002190 self.assertEqual([b"abc"], l)
Antoine Pitrou19690592009-06-12 20:14:08 +00002191
2192 def test_override_destructor(self):
2193 record = []
2194 class MyTextIO(self.TextIOWrapper):
2195 def __del__(self):
2196 record.append(1)
2197 try:
2198 f = super(MyTextIO, self).__del__
2199 except AttributeError:
2200 pass
2201 else:
2202 f()
2203 def close(self):
2204 record.append(2)
2205 super(MyTextIO, self).close()
2206 def flush(self):
2207 record.append(3)
2208 super(MyTextIO, self).flush()
2209 b = self.BytesIO()
2210 t = MyTextIO(b, encoding="ascii")
2211 del t
2212 support.gc_collect()
2213 self.assertEqual(record, [1, 2, 3])
2214
2215 def test_error_through_destructor(self):
2216 # Test that the exception state is not modified by a destructor,
2217 # even if close() fails.
2218 rawio = self.CloseFailureIO()
2219 def f():
2220 self.TextIOWrapper(rawio).xyzzy
2221 with support.captured_output("stderr") as s:
2222 self.assertRaises(AttributeError, f)
2223 s = s.getvalue().strip()
2224 if s:
2225 # The destructor *may* have printed an unraisable error, check it
2226 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002227 self.assertTrue(s.startswith("Exception IOError: "), s)
2228 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002229
2230 # Systematic tests of the text I/O API
2231
Antoine Pitrou19690592009-06-12 20:14:08 +00002232 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002233 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2234 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00002235 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002236 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002237 self.assertEqual(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002238 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002239 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002240 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002241 self.assertEqual(f.tell(), 0)
2242 self.assertEqual(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002243 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002244 self.assertEqual(f.seek(0), 0)
2245 self.assertEqual(f.read(None), "abc")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002246 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002247 self.assertEqual(f.read(2), "ab")
2248 self.assertEqual(f.read(1), "c")
2249 self.assertEqual(f.read(1), "")
2250 self.assertEqual(f.read(), "")
2251 self.assertEqual(f.tell(), cookie)
2252 self.assertEqual(f.seek(0), 0)
2253 self.assertEqual(f.seek(0, 2), cookie)
2254 self.assertEqual(f.write("def"), 3)
2255 self.assertEqual(f.seek(cookie), cookie)
2256 self.assertEqual(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002257 if enc.startswith("utf"):
2258 self.multi_line_test(f, enc)
2259 f.close()
2260
2261 def multi_line_test(self, f, enc):
2262 f.seek(0)
2263 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00002264 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002265 wlines = []
2266 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2267 chars = []
2268 for i in range(size):
2269 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00002270 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002271 wlines.append((f.tell(), line))
2272 f.write(line)
2273 f.seek(0)
2274 rlines = []
2275 while True:
2276 pos = f.tell()
2277 line = f.readline()
2278 if not line:
2279 break
2280 rlines.append((pos, line))
Ezio Melotti2623a372010-11-21 13:34:58 +00002281 self.assertEqual(rlines, wlines)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002282
Antoine Pitrou19690592009-06-12 20:14:08 +00002283 def test_telling(self):
2284 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002285 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002286 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002287 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002288 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002289 p2 = f.tell()
2290 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002291 self.assertEqual(f.tell(), p0)
2292 self.assertEqual(f.readline(), "\xff\n")
2293 self.assertEqual(f.tell(), p1)
2294 self.assertEqual(f.readline(), "\xff\n")
2295 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002296 f.seek(0)
2297 for line in f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002298 self.assertEqual(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002299 self.assertRaises(IOError, f.tell)
Ezio Melotti2623a372010-11-21 13:34:58 +00002300 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002301 f.close()
2302
Antoine Pitrou19690592009-06-12 20:14:08 +00002303 def test_seeking(self):
2304 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00002305 prefix_size = chunk_size - 2
2306 u_prefix = "a" * prefix_size
2307 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti2623a372010-11-21 13:34:58 +00002308 self.assertEqual(len(u_prefix), len(prefix))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002309 u_suffix = "\u8888\n"
2310 suffix = bytes(u_suffix.encode("utf-8"))
2311 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00002312 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002313 f.write(line*2)
2314 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002315 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002316 s = f.read(prefix_size)
Ezio Melotti2623a372010-11-21 13:34:58 +00002317 self.assertEqual(s, prefix.decode("ascii"))
2318 self.assertEqual(f.tell(), prefix_size)
2319 self.assertEqual(f.readline(), u_suffix)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002320
Antoine Pitrou19690592009-06-12 20:14:08 +00002321 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002322 # Regression test for a specific bug
2323 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00002324 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002325 f.write(data)
2326 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002327 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002328 f._CHUNK_SIZE # Just test that it exists
2329 f._CHUNK_SIZE = 2
2330 f.readline()
2331 f.tell()
2332
Antoine Pitrou19690592009-06-12 20:14:08 +00002333 def test_seek_and_tell(self):
2334 #Test seek/tell using the StatefulIncrementalDecoder.
2335 # Make test faster by doing smaller seeks
2336 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00002337
Antoine Pitrou19690592009-06-12 20:14:08 +00002338 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002339 """Tell/seek to various points within a data stream and ensure
2340 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00002341 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002342 f.write(data)
2343 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002344 f = self.open(support.TESTFN, encoding='test_decoder')
2345 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00002346 decoded = f.read()
2347 f.close()
2348
2349 for i in range(min_pos, len(decoded) + 1): # seek positions
2350 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00002351 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti2623a372010-11-21 13:34:58 +00002352 self.assertEqual(f.read(i), decoded[:i])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002353 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002354 self.assertEqual(f.read(j), decoded[i:i + j])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002355 f.seek(cookie)
Ezio Melotti2623a372010-11-21 13:34:58 +00002356 self.assertEqual(f.read(), decoded[i:])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002357 f.close()
2358
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002359 # Enable the test decoder.
2360 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002361
2362 # Run the tests.
2363 try:
2364 # Try each test case.
2365 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002366 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002367
2368 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002369 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2370 offset = CHUNK_SIZE - len(input)//2
2371 prefix = b'.'*offset
2372 # Don't bother seeking into the prefix (takes too long).
2373 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002374 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002375
2376 # Ensure our test decoder won't interfere with subsequent tests.
2377 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002378 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002379
Antoine Pitrou19690592009-06-12 20:14:08 +00002380 def test_encoded_writes(self):
2381 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002382 tests = ("utf-16",
2383 "utf-16-le",
2384 "utf-16-be",
2385 "utf-32",
2386 "utf-32-le",
2387 "utf-32-be")
2388 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002389 buf = self.BytesIO()
2390 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002391 # Check if the BOM is written only once (see issue1753).
2392 f.write(data)
2393 f.write(data)
2394 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002395 self.assertEqual(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002396 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002397 self.assertEqual(f.read(), data * 2)
2398 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002399
Antoine Pitrou19690592009-06-12 20:14:08 +00002400 def test_unreadable(self):
2401 class UnReadable(self.BytesIO):
2402 def readable(self):
2403 return False
2404 txt = self.TextIOWrapper(UnReadable())
2405 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002406
Antoine Pitrou19690592009-06-12 20:14:08 +00002407 def test_read_one_by_one(self):
2408 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002409 reads = ""
2410 while True:
2411 c = txt.read(1)
2412 if not c:
2413 break
2414 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002415 self.assertEqual(reads, "AA\nBB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002416
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002417 def test_readlines(self):
2418 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2419 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2420 txt.seek(0)
2421 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2422 txt.seek(0)
2423 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2424
Christian Heimes1a6387e2008-03-26 12:49:49 +00002425 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002426 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002427 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002428 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002429 reads = ""
2430 while True:
2431 c = txt.read(128)
2432 if not c:
2433 break
2434 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002435 self.assertEqual(reads, "A"*127+"\nB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002436
Antoine Pitroueadca1d2012-10-16 23:02:27 +02002437 def test_writelines(self):
2438 l = ['ab', 'cd', 'ef']
2439 buf = self.BytesIO()
2440 txt = self.TextIOWrapper(buf)
2441 txt.writelines(l)
2442 txt.flush()
2443 self.assertEqual(buf.getvalue(), b'abcdef')
2444
2445 def test_writelines_userlist(self):
2446 l = UserList(['ab', 'cd', 'ef'])
2447 buf = self.BytesIO()
2448 txt = self.TextIOWrapper(buf)
2449 txt.writelines(l)
2450 txt.flush()
2451 self.assertEqual(buf.getvalue(), b'abcdef')
2452
2453 def test_writelines_error(self):
2454 txt = self.TextIOWrapper(self.BytesIO())
2455 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2456 self.assertRaises(TypeError, txt.writelines, None)
2457 self.assertRaises(TypeError, txt.writelines, b'abc')
2458
Christian Heimes1a6387e2008-03-26 12:49:49 +00002459 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002460 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002461
2462 # read one char at a time
2463 reads = ""
2464 while True:
2465 c = txt.read(1)
2466 if not c:
2467 break
2468 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002469 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002470
2471 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002472 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002473 txt._CHUNK_SIZE = 4
2474
2475 reads = ""
2476 while True:
2477 c = txt.read(4)
2478 if not c:
2479 break
2480 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002481 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002482
2483 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002484 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002485 txt._CHUNK_SIZE = 4
2486
2487 reads = txt.read(4)
2488 reads += txt.read(4)
2489 reads += txt.readline()
2490 reads += txt.readline()
2491 reads += txt.readline()
Ezio Melotti2623a372010-11-21 13:34:58 +00002492 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002493
2494 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002495 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002496 txt._CHUNK_SIZE = 4
2497
2498 reads = txt.read(4)
2499 reads += txt.read()
Ezio Melotti2623a372010-11-21 13:34:58 +00002500 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002501
2502 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002503 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002504 txt._CHUNK_SIZE = 4
2505
2506 reads = txt.read(4)
2507 pos = txt.tell()
2508 txt.seek(0)
2509 txt.seek(pos)
Ezio Melotti2623a372010-11-21 13:34:58 +00002510 self.assertEqual(txt.read(4), "BBB\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002511
2512 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002513 buffer = self.BytesIO(self.testdata)
2514 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002515
2516 self.assertEqual(buffer.seekable(), txt.seekable())
2517
Antoine Pitrou19690592009-06-12 20:14:08 +00002518 def test_append_bom(self):
2519 # The BOM is not written again when appending to a non-empty file
2520 filename = support.TESTFN
2521 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2522 with self.open(filename, 'w', encoding=charset) as f:
2523 f.write('aaa')
2524 pos = f.tell()
2525 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002526 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002527
2528 with self.open(filename, 'a', encoding=charset) as f:
2529 f.write('xxx')
2530 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002531 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002532
Antoine Pitrou19690592009-06-12 20:14:08 +00002533 def test_seek_bom(self):
2534 # Same test, but when seeking manually
2535 filename = support.TESTFN
2536 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2537 with self.open(filename, 'w', encoding=charset) as f:
2538 f.write('aaa')
2539 pos = f.tell()
2540 with self.open(filename, 'r+', encoding=charset) as f:
2541 f.seek(pos)
2542 f.write('zzz')
2543 f.seek(0)
2544 f.write('bbb')
2545 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002546 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002547
2548 def test_errors_property(self):
2549 with self.open(support.TESTFN, "w") as f:
2550 self.assertEqual(f.errors, "strict")
2551 with self.open(support.TESTFN, "w", errors="replace") as f:
2552 self.assertEqual(f.errors, "replace")
2553
Victor Stinner6a102812010-04-27 23:55:59 +00002554 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002555 def test_threads_write(self):
2556 # Issue6750: concurrent writes could duplicate data
2557 event = threading.Event()
2558 with self.open(support.TESTFN, "w", buffering=1) as f:
2559 def run(n):
2560 text = "Thread%03d\n" % n
2561 event.wait()
2562 f.write(text)
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03002563 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002564 for x in range(20)]
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03002565 with support.start_threads(threads, event.set):
2566 time.sleep(0.02)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002567 with self.open(support.TESTFN) as f:
2568 content = f.read()
2569 for n in range(20):
Ezio Melotti2623a372010-11-21 13:34:58 +00002570 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002571
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002572 def test_flush_error_on_close(self):
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002573 # Test that text file is closed despite failed flush
2574 # and that flush() is called before file closed.
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002575 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002576 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002577 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002578 closed[:] = [txt.closed, txt.buffer.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002579 raise IOError()
2580 txt.flush = bad_flush
2581 self.assertRaises(IOError, txt.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -06002582 self.assertTrue(txt.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002583 self.assertTrue(txt.buffer.closed)
2584 self.assertTrue(closed) # flush() called
2585 self.assertFalse(closed[0]) # flush() called before file closed
2586 self.assertFalse(closed[1])
Serhiy Storchaka437d5352015-02-23 00:28:38 +02002587 txt.flush = lambda: None # break reference loop
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002588
2589 def test_multi_close(self):
2590 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2591 txt.close()
2592 txt.close()
2593 txt.close()
2594 self.assertRaises(ValueError, txt.flush)
2595
Antoine Pitroufc9ead62010-12-21 21:26:55 +00002596 def test_readonly_attributes(self):
2597 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2598 buf = self.BytesIO(self.testdata)
2599 with self.assertRaises((AttributeError, TypeError)):
2600 txt.buffer = buf
2601
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002602 def test_read_nonbytes(self):
2603 # Issue #17106
2604 # Crash when underlying read() returns non-bytes
2605 class NonbytesStream(self.StringIO):
2606 read1 = self.StringIO.read
2607 class NonbytesStream(self.StringIO):
2608 read1 = self.StringIO.read
2609 t = self.TextIOWrapper(NonbytesStream('a'))
2610 with self.maybeRaises(TypeError):
2611 t.read(1)
2612 t = self.TextIOWrapper(NonbytesStream('a'))
2613 with self.maybeRaises(TypeError):
2614 t.readline()
2615 t = self.TextIOWrapper(NonbytesStream('a'))
2616 self.assertEqual(t.read(), u'a')
2617
2618 def test_illegal_decoder(self):
2619 # Issue #17106
2620 # Crash when decoder returns non-string
2621 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2622 encoding='quopri_codec')
2623 with self.maybeRaises(TypeError):
2624 t.read(1)
2625 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2626 encoding='quopri_codec')
2627 with self.maybeRaises(TypeError):
2628 t.readline()
2629 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2630 encoding='quopri_codec')
2631 with self.maybeRaises(TypeError):
2632 t.read()
2633
2634
Antoine Pitrou19690592009-06-12 20:14:08 +00002635class CTextIOWrapperTest(TextIOWrapperTest):
2636
2637 def test_initialization(self):
2638 r = self.BytesIO(b"\xc3\xa9\n\n")
2639 b = self.BufferedReader(r, 1000)
2640 t = self.TextIOWrapper(b)
2641 self.assertRaises(TypeError, t.__init__, b, newline=42)
2642 self.assertRaises(ValueError, t.read)
2643 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2644 self.assertRaises(ValueError, t.read)
2645
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002646 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2647 self.assertRaises(Exception, repr, t)
2648
Antoine Pitrou19690592009-06-12 20:14:08 +00002649 def test_garbage_collection(self):
2650 # C TextIOWrapper objects are collected, and collecting them flushes
2651 # all data to disk.
2652 # The Python version has __del__, so it ends in gc.garbage instead.
2653 rawio = io.FileIO(support.TESTFN, "wb")
2654 b = self.BufferedWriter(rawio)
2655 t = self.TextIOWrapper(b, encoding="ascii")
2656 t.write("456def")
2657 t.x = t
2658 wr = weakref.ref(t)
2659 del t
2660 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002661 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002662 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002663 self.assertEqual(f.read(), b"456def")
2664
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002665 def test_rwpair_cleared_before_textio(self):
2666 # Issue 13070: TextIOWrapper's finalization would crash when called
2667 # after the reference to the underlying BufferedRWPair's writer got
2668 # cleared by the GC.
2669 for i in range(1000):
2670 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2671 t1 = self.TextIOWrapper(b1, encoding="ascii")
2672 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2673 t2 = self.TextIOWrapper(b2, encoding="ascii")
2674 # circular references
2675 t1.buddy = t2
2676 t2.buddy = t1
2677 support.gc_collect()
2678
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002679 maybeRaises = unittest.TestCase.assertRaises
2680
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002681
Antoine Pitrou19690592009-06-12 20:14:08 +00002682class PyTextIOWrapperTest(TextIOWrapperTest):
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002683 @contextlib.contextmanager
2684 def maybeRaises(self, *args, **kwds):
2685 yield
Antoine Pitrou19690592009-06-12 20:14:08 +00002686
2687
2688class IncrementalNewlineDecoderTest(unittest.TestCase):
2689
2690 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002691 # UTF-8 specific tests for a newline decoder
2692 def _check_decode(b, s, **kwargs):
2693 # We exercise getstate() / setstate() as well as decode()
2694 state = decoder.getstate()
Ezio Melotti2623a372010-11-21 13:34:58 +00002695 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002696 decoder.setstate(state)
Ezio Melotti2623a372010-11-21 13:34:58 +00002697 self.assertEqual(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002698
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002699 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002700
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002701 _check_decode(b'\xe8', "")
2702 _check_decode(b'\xa2', "")
2703 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002704
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002705 _check_decode(b'\xe8', "")
2706 _check_decode(b'\xa2', "")
2707 _check_decode(b'\x88', "\u8888")
2708
2709 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002710 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2711
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002712 decoder.reset()
2713 _check_decode(b'\n', "\n")
2714 _check_decode(b'\r', "")
2715 _check_decode(b'', "\n", final=True)
2716 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002717
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002718 _check_decode(b'\r', "")
2719 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002720
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002721 _check_decode(b'\r\r\n', "\n\n")
2722 _check_decode(b'\r', "")
2723 _check_decode(b'\r', "\n")
2724 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002725
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002726 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2727 _check_decode(b'\xe8\xa2\x88', "\u8888")
2728 _check_decode(b'\n', "\n")
2729 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2730 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002731
Antoine Pitrou19690592009-06-12 20:14:08 +00002732 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002733 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002734 if encoding is not None:
2735 encoder = codecs.getincrementalencoder(encoding)()
2736 def _decode_bytewise(s):
2737 # Decode one byte at a time
2738 for b in encoder.encode(s):
2739 result.append(decoder.decode(b))
2740 else:
2741 encoder = None
2742 def _decode_bytewise(s):
2743 # Decode one char at a time
2744 for c in s:
2745 result.append(decoder.decode(c))
Ezio Melotti2623a372010-11-21 13:34:58 +00002746 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002747 _decode_bytewise("abc\n\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002748 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002749 _decode_bytewise("\nabc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002750 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002751 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002752 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002753 _decode_bytewise("abc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002754 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002755 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002756 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002757 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002758 input = "abc"
2759 if encoder is not None:
2760 encoder.reset()
2761 input = encoder.encode(input)
Ezio Melotti2623a372010-11-21 13:34:58 +00002762 self.assertEqual(decoder.decode(input), "abc")
2763 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002764
2765 def test_newline_decoder(self):
2766 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002767 # None meaning the IncrementalNewlineDecoder takes unicode input
2768 # rather than bytes input
2769 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002770 'utf-16', 'utf-16-le', 'utf-16-be',
2771 'utf-32', 'utf-32-le', 'utf-32-be',
2772 )
2773 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002774 decoder = enc and codecs.getincrementaldecoder(enc)()
2775 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2776 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002777 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002778 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2779 self.check_newline_decoding_utf8(decoder)
2780
2781 def test_newline_bytes(self):
2782 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2783 def _check(dec):
Ezio Melotti2623a372010-11-21 13:34:58 +00002784 self.assertEqual(dec.newlines, None)
2785 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2786 self.assertEqual(dec.newlines, None)
2787 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2788 self.assertEqual(dec.newlines, None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002789 dec = self.IncrementalNewlineDecoder(None, translate=False)
2790 _check(dec)
2791 dec = self.IncrementalNewlineDecoder(None, translate=True)
2792 _check(dec)
2793
2794class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2795 pass
2796
2797class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2798 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002799
Christian Heimes1a6387e2008-03-26 12:49:49 +00002800
2801# XXX Tests for open()
2802
2803class MiscIOTest(unittest.TestCase):
2804
Benjamin Petersonad100c32008-11-20 22:06:22 +00002805 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002806 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002807
Antoine Pitrou19690592009-06-12 20:14:08 +00002808 def test___all__(self):
2809 for name in self.io.__all__:
2810 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002811 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002812 if name == "open":
2813 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002814 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002815 self.assertTrue(issubclass(obj, Exception), name)
2816 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002817 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002818
Benjamin Petersonad100c32008-11-20 22:06:22 +00002819 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002820 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002821 self.assertEqual(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002822 f.close()
2823
Antoine Pitrou19690592009-06-12 20:14:08 +00002824 f = self.open(support.TESTFN, "U")
Ezio Melotti2623a372010-11-21 13:34:58 +00002825 self.assertEqual(f.name, support.TESTFN)
2826 self.assertEqual(f.buffer.name, support.TESTFN)
2827 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2828 self.assertEqual(f.mode, "U")
2829 self.assertEqual(f.buffer.mode, "rb")
2830 self.assertEqual(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002831 f.close()
2832
Antoine Pitrou19690592009-06-12 20:14:08 +00002833 f = self.open(support.TESTFN, "w+")
Ezio Melotti2623a372010-11-21 13:34:58 +00002834 self.assertEqual(f.mode, "w+")
2835 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2836 self.assertEqual(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002837
Antoine Pitrou19690592009-06-12 20:14:08 +00002838 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti2623a372010-11-21 13:34:58 +00002839 self.assertEqual(g.mode, "wb")
2840 self.assertEqual(g.raw.mode, "wb")
2841 self.assertEqual(g.name, f.fileno())
2842 self.assertEqual(g.raw.name, f.fileno())
Benjamin Petersonad100c32008-11-20 22:06:22 +00002843 f.close()
2844 g.close()
2845
Antoine Pitrou19690592009-06-12 20:14:08 +00002846 def test_io_after_close(self):
2847 for kwargs in [
2848 {"mode": "w"},
2849 {"mode": "wb"},
2850 {"mode": "w", "buffering": 1},
2851 {"mode": "w", "buffering": 2},
2852 {"mode": "wb", "buffering": 0},
2853 {"mode": "r"},
2854 {"mode": "rb"},
2855 {"mode": "r", "buffering": 1},
2856 {"mode": "r", "buffering": 2},
2857 {"mode": "rb", "buffering": 0},
2858 {"mode": "w+"},
2859 {"mode": "w+b"},
2860 {"mode": "w+", "buffering": 1},
2861 {"mode": "w+", "buffering": 2},
2862 {"mode": "w+b", "buffering": 0},
2863 ]:
2864 f = self.open(support.TESTFN, **kwargs)
2865 f.close()
2866 self.assertRaises(ValueError, f.flush)
2867 self.assertRaises(ValueError, f.fileno)
2868 self.assertRaises(ValueError, f.isatty)
2869 self.assertRaises(ValueError, f.__iter__)
2870 if hasattr(f, "peek"):
2871 self.assertRaises(ValueError, f.peek, 1)
2872 self.assertRaises(ValueError, f.read)
2873 if hasattr(f, "read1"):
2874 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinner5100a402011-05-25 22:15:36 +02002875 if hasattr(f, "readall"):
2876 self.assertRaises(ValueError, f.readall)
Antoine Pitrou19690592009-06-12 20:14:08 +00002877 if hasattr(f, "readinto"):
2878 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2879 self.assertRaises(ValueError, f.readline)
2880 self.assertRaises(ValueError, f.readlines)
2881 self.assertRaises(ValueError, f.seek, 0)
2882 self.assertRaises(ValueError, f.tell)
2883 self.assertRaises(ValueError, f.truncate)
2884 self.assertRaises(ValueError, f.write,
2885 b"" if "b" in kwargs['mode'] else "")
2886 self.assertRaises(ValueError, f.writelines, [])
2887 self.assertRaises(ValueError, next, f)
2888
2889 def test_blockingioerror(self):
2890 # Various BlockingIOError issues
2891 self.assertRaises(TypeError, self.BlockingIOError)
2892 self.assertRaises(TypeError, self.BlockingIOError, 1)
2893 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2894 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2895 b = self.BlockingIOError(1, "")
2896 self.assertEqual(b.characters_written, 0)
2897 class C(unicode):
2898 pass
2899 c = C("")
2900 b = self.BlockingIOError(1, c)
2901 c.b = b
2902 b.c = c
2903 wr = weakref.ref(c)
2904 del c, b
2905 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002906 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002907
2908 def test_abcs(self):
2909 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002910 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2911 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2912 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2913 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002914
2915 def _check_abc_inheritance(self, abcmodule):
2916 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002917 self.assertIsInstance(f, abcmodule.IOBase)
2918 self.assertIsInstance(f, abcmodule.RawIOBase)
2919 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2920 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002921 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002922 self.assertIsInstance(f, abcmodule.IOBase)
2923 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2924 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2925 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002926 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002927 self.assertIsInstance(f, abcmodule.IOBase)
2928 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2929 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2930 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002931
2932 def test_abc_inheritance(self):
2933 # Test implementations inherit from their respective ABCs
2934 self._check_abc_inheritance(self)
2935
2936 def test_abc_inheritance_official(self):
2937 # Test implementations inherit from the official ABCs of the
2938 # baseline "io" module.
2939 self._check_abc_inheritance(io)
2940
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01002941 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2942 def test_nonblock_pipe_write_bigbuf(self):
2943 self._test_nonblock_pipe_write(16*1024)
2944
2945 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2946 def test_nonblock_pipe_write_smallbuf(self):
2947 self._test_nonblock_pipe_write(1024)
2948
2949 def _set_non_blocking(self, fd):
2950 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2951 self.assertNotEqual(flags, -1)
2952 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2953 self.assertEqual(res, 0)
2954
2955 def _test_nonblock_pipe_write(self, bufsize):
2956 sent = []
2957 received = []
2958 r, w = os.pipe()
2959 self._set_non_blocking(r)
2960 self._set_non_blocking(w)
2961
2962 # To exercise all code paths in the C implementation we need
2963 # to play with buffer sizes. For instance, if we choose a
2964 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2965 # then we will never get a partial write of the buffer.
2966 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2967 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2968
2969 with rf, wf:
2970 for N in 9999, 73, 7574:
2971 try:
2972 i = 0
2973 while True:
2974 msg = bytes([i % 26 + 97]) * N
2975 sent.append(msg)
2976 wf.write(msg)
2977 i += 1
2978
2979 except self.BlockingIOError as e:
2980 self.assertEqual(e.args[0], errno.EAGAIN)
2981 sent[-1] = sent[-1][:e.characters_written]
2982 received.append(rf.read())
2983 msg = b'BLOCKED'
2984 wf.write(msg)
2985 sent.append(msg)
2986
2987 while True:
2988 try:
2989 wf.flush()
2990 break
2991 except self.BlockingIOError as e:
2992 self.assertEqual(e.args[0], errno.EAGAIN)
2993 self.assertEqual(e.characters_written, 0)
2994 received.append(rf.read())
2995
2996 received += iter(rf.read, None)
2997
2998 sent, received = b''.join(sent), b''.join(received)
2999 self.assertTrue(sent == received)
3000 self.assertTrue(wf.closed)
3001 self.assertTrue(rf.closed)
3002
Antoine Pitrou19690592009-06-12 20:14:08 +00003003class CMiscIOTest(MiscIOTest):
3004 io = io
3005
3006class PyMiscIOTest(MiscIOTest):
3007 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00003008
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003009
3010@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3011class SignalsTest(unittest.TestCase):
3012
3013 def setUp(self):
3014 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3015
3016 def tearDown(self):
3017 signal.signal(signal.SIGALRM, self.oldalrm)
3018
3019 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00003020 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003021
3022 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinner49d495f2011-07-04 11:44:46 +02003023 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
3024 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003025 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3026 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou6439c002011-02-25 21:35:47 +00003027 invokes the signal handler, and bubbles up the exception raised
3028 in the latter."""
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003029 read_results = []
3030 def _read():
3031 s = os.read(r, 1)
3032 read_results.append(s)
3033 t = threading.Thread(target=_read)
3034 t.daemon = True
3035 r, w = os.pipe()
3036 try:
3037 wio = self.io.open(w, **fdopen_kwargs)
3038 t.start()
3039 signal.alarm(1)
3040 # Fill the pipe enough that the write will be blocking.
3041 # It will be interrupted by the timer armed above. Since the
3042 # other thread has read one byte, the low-level write will
3043 # return with a successful (partial) result rather than an EINTR.
3044 # The buffered IO layer must check for pending signal
3045 # handlers, which in this case will invoke alarm_interrupt().
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03003046 try:
3047 with self.assertRaises(ZeroDivisionError):
3048 wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1))
3049 finally:
3050 t.join()
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003051 # We got one byte, get another one and check that it isn't a
3052 # repeat of the first one.
3053 read_results.append(os.read(r, 1))
3054 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3055 finally:
3056 os.close(w)
3057 os.close(r)
3058 # This is deliberate. If we didn't close the file descriptor
3059 # before closing wio, wio would try to flush its internal
3060 # buffer, and block again.
3061 try:
3062 wio.close()
3063 except IOError as e:
3064 if e.errno != errno.EBADF:
3065 raise
3066
3067 def test_interrupted_write_unbuffered(self):
3068 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3069
3070 def test_interrupted_write_buffered(self):
3071 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3072
3073 def test_interrupted_write_text(self):
3074 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3075
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003076 def check_reentrant_write(self, data, **fdopen_kwargs):
3077 def on_alarm(*args):
3078 # Will be called reentrantly from the same thread
3079 wio.write(data)
Victor Stinner4c41f842011-07-05 13:29:26 +02003080 1//0
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003081 signal.signal(signal.SIGALRM, on_alarm)
3082 r, w = os.pipe()
3083 wio = self.io.open(w, **fdopen_kwargs)
3084 try:
3085 signal.alarm(1)
3086 # Either the reentrant call to wio.write() fails with RuntimeError,
3087 # or the signal handler raises ZeroDivisionError.
3088 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3089 while 1:
3090 for i in range(100):
3091 wio.write(data)
3092 wio.flush()
3093 # Make sure the buffer doesn't fill up and block further writes
3094 os.read(r, len(data) * 100)
3095 exc = cm.exception
3096 if isinstance(exc, RuntimeError):
3097 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3098 finally:
3099 wio.close()
3100 os.close(r)
3101
3102 def test_reentrant_write_buffered(self):
3103 self.check_reentrant_write(b"xy", mode="wb")
3104
3105 def test_reentrant_write_text(self):
3106 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3107
Antoine Pitrou6439c002011-02-25 21:35:47 +00003108 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3109 """Check that a buffered read, when it gets interrupted (either
3110 returning a partial result or EINTR), properly invokes the signal
3111 handler and retries if the latter returned successfully."""
3112 r, w = os.pipe()
3113 fdopen_kwargs["closefd"] = False
3114 def alarm_handler(sig, frame):
3115 os.write(w, b"bar")
3116 signal.signal(signal.SIGALRM, alarm_handler)
3117 try:
3118 rio = self.io.open(r, **fdopen_kwargs)
3119 os.write(w, b"foo")
3120 signal.alarm(1)
3121 # Expected behaviour:
3122 # - first raw read() returns partial b"foo"
3123 # - second raw read() returns EINTR
3124 # - third raw read() returns b"bar"
3125 self.assertEqual(decode(rio.read(6)), "foobar")
3126 finally:
3127 rio.close()
3128 os.close(w)
3129 os.close(r)
3130
3131 def test_interrupterd_read_retry_buffered(self):
3132 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3133 mode="rb")
3134
3135 def test_interrupterd_read_retry_text(self):
3136 self.check_interrupted_read_retry(lambda x: x,
3137 mode="r")
3138
3139 @unittest.skipUnless(threading, 'Threading required for this test.')
3140 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3141 """Check that a buffered write, when it gets interrupted (either
3142 returning a partial result or EINTR), properly invokes the signal
3143 handler and retries if the latter returned successfully."""
3144 select = support.import_module("select")
3145 # A quantity that exceeds the buffer size of an anonymous pipe's
3146 # write end.
Antoine Pitrou68915d72013-04-24 23:31:38 +02003147 N = support.PIPE_MAX_SIZE
Antoine Pitrou6439c002011-02-25 21:35:47 +00003148 r, w = os.pipe()
3149 fdopen_kwargs["closefd"] = False
3150 # We need a separate thread to read from the pipe and allow the
3151 # write() to finish. This thread is started after the SIGALRM is
3152 # received (forcing a first EINTR in write()).
3153 read_results = []
3154 write_finished = False
Serhiy Storchaka53ea1622015-03-28 20:38:48 +02003155 error = [None]
Antoine Pitrou6439c002011-02-25 21:35:47 +00003156 def _read():
Serhiy Storchaka53ea1622015-03-28 20:38:48 +02003157 try:
3158 while not write_finished:
3159 while r in select.select([r], [], [], 1.0)[0]:
3160 s = os.read(r, 1024)
3161 read_results.append(s)
3162 except BaseException as exc:
3163 error[0] = exc
Antoine Pitrou6439c002011-02-25 21:35:47 +00003164 t = threading.Thread(target=_read)
3165 t.daemon = True
3166 def alarm1(sig, frame):
3167 signal.signal(signal.SIGALRM, alarm2)
3168 signal.alarm(1)
3169 def alarm2(sig, frame):
3170 t.start()
3171 signal.signal(signal.SIGALRM, alarm1)
3172 try:
3173 wio = self.io.open(w, **fdopen_kwargs)
3174 signal.alarm(1)
3175 # Expected behaviour:
3176 # - first raw write() is partial (because of the limited pipe buffer
3177 # and the first alarm)
3178 # - second raw write() returns EINTR (because of the second alarm)
3179 # - subsequent write()s are successful (either partial or complete)
3180 self.assertEqual(N, wio.write(item * N))
3181 wio.flush()
3182 write_finished = True
3183 t.join()
Serhiy Storchaka53ea1622015-03-28 20:38:48 +02003184
3185 self.assertIsNone(error[0])
Antoine Pitrou6439c002011-02-25 21:35:47 +00003186 self.assertEqual(N, sum(len(x) for x in read_results))
3187 finally:
3188 write_finished = True
3189 os.close(w)
3190 os.close(r)
3191 # This is deliberate. If we didn't close the file descriptor
3192 # before closing wio, wio would try to flush its internal
3193 # buffer, and could block (in case of failure).
3194 try:
3195 wio.close()
3196 except IOError as e:
3197 if e.errno != errno.EBADF:
3198 raise
3199
3200 def test_interrupterd_write_retry_buffered(self):
3201 self.check_interrupted_write_retry(b"x", mode="wb")
3202
3203 def test_interrupterd_write_retry_text(self):
3204 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3205
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003206
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003207class CSignalsTest(SignalsTest):
3208 io = io
3209
3210class PySignalsTest(SignalsTest):
3211 io = pyio
3212
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003213 # Handling reentrancy issues would slow down _pyio even more, so the
3214 # tests are disabled.
3215 test_reentrant_write_buffered = None
3216 test_reentrant_write_text = None
3217
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003218
Christian Heimes1a6387e2008-03-26 12:49:49 +00003219def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00003220 tests = (CIOTest, PyIOTest,
3221 CBufferedReaderTest, PyBufferedReaderTest,
3222 CBufferedWriterTest, PyBufferedWriterTest,
3223 CBufferedRWPairTest, PyBufferedRWPairTest,
3224 CBufferedRandomTest, PyBufferedRandomTest,
3225 StatefulIncrementalDecoderTest,
3226 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3227 CTextIOWrapperTest, PyTextIOWrapperTest,
3228 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003229 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00003230 )
3231
3232 # Put the namespaces of the IO module we are testing and some useful mock
3233 # classes in the __dict__ of each test.
3234 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00003235 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00003236 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3237 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
3238 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
3239 globs = globals()
3240 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3241 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3242 # Avoid turning open into a bound method.
3243 py_io_ns["open"] = pyio.OpenWrapper
3244 for test in tests:
3245 if test.__name__.startswith("C"):
3246 for name, obj in c_io_ns.items():
3247 setattr(test, name, obj)
3248 elif test.__name__.startswith("Py"):
3249 for name, obj in py_io_ns.items():
3250 setattr(test, name, obj)
3251
3252 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00003253
3254if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00003255 test_main()