blob: b7f6046038daff4db416c9eb7350acffc26bf83e [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakac72e66a2015-11-02 15:06:09 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Antoine Pitrou19690592009-06-12 20:14:08 +000019# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Serhiy Storchaka05b0a1b2014-06-09 13:32:08 +030032import warnings
Antoine Pitrou19690592009-06-12 20:14:08 +000033import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000034import signal
35import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000036from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000037from collections import deque
Antoine Pitrou78e761e2012-10-16 22:57:11 +020038from UserList import UserList
Antoine Pitrou19690592009-06-12 20:14:08 +000039from test import test_support as support
Serhiy Storchaka354d50e2013-02-03 17:10:42 +020040import contextlib
Christian Heimes1a6387e2008-03-26 12:49:49 +000041
42import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000043import io # C implementation of io
44import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000045try:
46 import threading
47except ImportError:
48 threading = None
Antoine Pitrou5aa7df32011-11-21 20:16:44 +010049try:
50 import fcntl
51except ImportError:
52 fcntl = None
Antoine Pitrou19690592009-06-12 20:14:08 +000053
54__metaclass__ = type
55bytes = support.py3k_bytes
56
57def _default_chunk_size():
58 """Get the default TextIOWrapper chunk size"""
59 with io.open(__file__, "r", encoding="latin1") as f:
60 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000061
62
Antoine Pitrou6391b342010-09-14 18:48:19 +000063class MockRawIOWithoutRead:
64 """A RawIO implementation without read(), so as to exercise the default
65 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000066
67 def __init__(self, read_stack=()):
68 self._read_stack = list(read_stack)
69 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000070 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000071 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000072
Christian Heimes1a6387e2008-03-26 12:49:49 +000073 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000074 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000075 return len(b)
76
77 def writable(self):
78 return True
79
80 def fileno(self):
81 return 42
82
83 def readable(self):
84 return True
85
86 def seekable(self):
87 return True
88
89 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000090 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000091
92 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000093 return 0 # same comment as above
94
95 def readinto(self, buf):
96 self._reads += 1
97 max_len = len(buf)
98 try:
99 data = self._read_stack[0]
100 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000101 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +0000102 return 0
103 if data is None:
104 del self._read_stack[0]
105 return None
106 n = len(data)
107 if len(data) <= max_len:
108 del self._read_stack[0]
109 buf[:n] = data
110 return n
111 else:
112 buf[:] = data[:max_len]
113 self._read_stack[0] = data[max_len:]
114 return max_len
115
116 def truncate(self, pos=None):
117 return pos
118
Antoine Pitrou6391b342010-09-14 18:48:19 +0000119class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
120 pass
121
122class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
123 pass
124
125
126class MockRawIO(MockRawIOWithoutRead):
127
128 def read(self, n=None):
129 self._reads += 1
130 try:
131 return self._read_stack.pop(0)
132 except:
133 self._extraneous_reads += 1
134 return b""
135
Antoine Pitrou19690592009-06-12 20:14:08 +0000136class CMockRawIO(MockRawIO, io.RawIOBase):
137 pass
138
139class PyMockRawIO(MockRawIO, pyio.RawIOBase):
140 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000141
142
Antoine Pitrou19690592009-06-12 20:14:08 +0000143class MisbehavedRawIO(MockRawIO):
144 def write(self, b):
145 return MockRawIO.write(self, b) * 2
146
147 def read(self, n=None):
148 return MockRawIO.read(self, n) * 2
149
150 def seek(self, pos, whence):
151 return -123
152
153 def tell(self):
154 return -456
155
156 def readinto(self, buf):
157 MockRawIO.readinto(self, buf)
158 return len(buf) * 5
159
160class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
161 pass
162
163class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
164 pass
165
166
167class CloseFailureIO(MockRawIO):
168 closed = 0
169
170 def close(self):
171 if not self.closed:
172 self.closed = 1
173 raise IOError
174
175class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
176 pass
177
178class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
179 pass
180
181
182class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000183
184 def __init__(self, data):
185 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000186 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000187
188 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000189 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000190 self.read_history.append(None if res is None else len(res))
191 return res
192
Antoine Pitrou19690592009-06-12 20:14:08 +0000193 def readinto(self, b):
194 res = super(MockFileIO, self).readinto(b)
195 self.read_history.append(res)
196 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000197
Antoine Pitrou19690592009-06-12 20:14:08 +0000198class CMockFileIO(MockFileIO, io.BytesIO):
199 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000200
Antoine Pitrou19690592009-06-12 20:14:08 +0000201class PyMockFileIO(MockFileIO, pyio.BytesIO):
202 pass
203
204
205class MockNonBlockWriterIO:
206
207 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000208 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000209 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000210
Antoine Pitrou19690592009-06-12 20:14:08 +0000211 def pop_written(self):
212 s = b"".join(self._write_stack)
213 self._write_stack[:] = []
214 return s
215
216 def block_on(self, char):
217 """Block when a given char is encountered."""
218 self._blocker_char = char
219
220 def readable(self):
221 return True
222
223 def seekable(self):
224 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000225
226 def writable(self):
227 return True
228
Antoine Pitrou19690592009-06-12 20:14:08 +0000229 def write(self, b):
230 b = bytes(b)
231 n = -1
232 if self._blocker_char:
233 try:
234 n = b.index(self._blocker_char)
235 except ValueError:
236 pass
237 else:
Antoine Pitrou5aa7df32011-11-21 20:16:44 +0100238 if n > 0:
239 # write data up to the first blocker
240 self._write_stack.append(b[:n])
241 return n
242 else:
243 # cancel blocker and indicate would block
244 self._blocker_char = None
245 return None
Antoine Pitrou19690592009-06-12 20:14:08 +0000246 self._write_stack.append(b)
247 return len(b)
248
249class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
250 BlockingIOError = io.BlockingIOError
251
252class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
253 BlockingIOError = pyio.BlockingIOError
254
Christian Heimes1a6387e2008-03-26 12:49:49 +0000255
256class IOTest(unittest.TestCase):
257
Antoine Pitrou19690592009-06-12 20:14:08 +0000258 def setUp(self):
259 support.unlink(support.TESTFN)
260
Christian Heimes1a6387e2008-03-26 12:49:49 +0000261 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000262 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000263
264 def write_ops(self, f):
265 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000266 f.truncate(0)
267 self.assertEqual(f.tell(), 5)
268 f.seek(0)
269
270 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000271 self.assertEqual(f.seek(0), 0)
272 self.assertEqual(f.write(b"Hello."), 6)
273 self.assertEqual(f.tell(), 6)
274 self.assertEqual(f.seek(-1, 1), 5)
275 self.assertEqual(f.tell(), 5)
276 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
277 self.assertEqual(f.seek(0), 0)
278 self.assertEqual(f.write(b"h"), 1)
279 self.assertEqual(f.seek(-1, 2), 13)
280 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000281
Christian Heimes1a6387e2008-03-26 12:49:49 +0000282 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000283 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000284 self.assertRaises(TypeError, f.seek, 0.0)
285
286 def read_ops(self, f, buffered=False):
287 data = f.read(5)
288 self.assertEqual(data, b"hello")
289 data = bytearray(data)
290 self.assertEqual(f.readinto(data), 5)
291 self.assertEqual(data, b" worl")
292 self.assertEqual(f.readinto(data), 2)
293 self.assertEqual(len(data), 5)
294 self.assertEqual(data[:2], b"d\n")
295 self.assertEqual(f.seek(0), 0)
296 self.assertEqual(f.read(20), b"hello world\n")
297 self.assertEqual(f.read(1), b"")
298 self.assertEqual(f.readinto(bytearray(b"x")), 0)
299 self.assertEqual(f.seek(-6, 2), 6)
300 self.assertEqual(f.read(5), b"world")
301 self.assertEqual(f.read(0), b"")
302 self.assertEqual(f.readinto(bytearray()), 0)
303 self.assertEqual(f.seek(-6, 1), 5)
304 self.assertEqual(f.read(5), b" worl")
305 self.assertEqual(f.tell(), 10)
306 self.assertRaises(TypeError, f.seek, 0.0)
307 if buffered:
308 f.seek(0)
309 self.assertEqual(f.read(), b"hello world\n")
310 f.seek(6)
311 self.assertEqual(f.read(), b"world\n")
312 self.assertEqual(f.read(), b"")
313
314 LARGE = 2**31
315
316 def large_file_ops(self, f):
317 assert f.readable()
318 assert f.writable()
319 self.assertEqual(f.seek(self.LARGE), self.LARGE)
320 self.assertEqual(f.tell(), self.LARGE)
321 self.assertEqual(f.write(b"xxx"), 3)
322 self.assertEqual(f.tell(), self.LARGE + 3)
323 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
324 self.assertEqual(f.truncate(), self.LARGE + 2)
325 self.assertEqual(f.tell(), self.LARGE + 2)
326 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
327 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000328 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000329 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
330 self.assertEqual(f.seek(-1, 2), self.LARGE)
331 self.assertEqual(f.read(2), b"x")
332
Antoine Pitrou19690592009-06-12 20:14:08 +0000333 def test_invalid_operations(self):
334 # Try writing on a file opened in read mode and vice-versa.
335 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000336 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000337 self.assertRaises(IOError, fp.read)
338 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000339 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000340 self.assertRaises(IOError, fp.write, b"blah")
341 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000342 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000343 self.assertRaises(IOError, fp.write, "blah")
344 self.assertRaises(IOError, fp.writelines, ["blah\n"])
345
Christian Heimes1a6387e2008-03-26 12:49:49 +0000346 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000347 with self.open(support.TESTFN, "wb", buffering=0) as f:
348 self.assertEqual(f.readable(), False)
349 self.assertEqual(f.writable(), True)
350 self.assertEqual(f.seekable(), True)
351 self.write_ops(f)
352 with self.open(support.TESTFN, "rb", buffering=0) as f:
353 self.assertEqual(f.readable(), True)
354 self.assertEqual(f.writable(), False)
355 self.assertEqual(f.seekable(), True)
356 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000357
358 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000359 with self.open(support.TESTFN, "wb") as f:
360 self.assertEqual(f.readable(), False)
361 self.assertEqual(f.writable(), True)
362 self.assertEqual(f.seekable(), True)
363 self.write_ops(f)
364 with self.open(support.TESTFN, "rb") as f:
365 self.assertEqual(f.readable(), True)
366 self.assertEqual(f.writable(), False)
367 self.assertEqual(f.seekable(), True)
368 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000369
370 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000371 with self.open(support.TESTFN, "wb") as f:
372 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
373 with self.open(support.TESTFN, "rb") as f:
374 self.assertEqual(f.readline(), b"abc\n")
375 self.assertEqual(f.readline(10), b"def\n")
376 self.assertEqual(f.readline(2), b"xy")
377 self.assertEqual(f.readline(4), b"zzy\n")
378 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000379 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000380 self.assertRaises(TypeError, f.readline, 5.3)
381 with self.open(support.TESTFN, "r") as f:
382 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000383
384 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000385 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000386 self.write_ops(f)
387 data = f.getvalue()
388 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000389 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000390 self.read_ops(f, True)
391
392 def test_large_file_ops(self):
393 # On Windows and Mac OSX this test comsumes large resources; It takes
394 # a long time to build the >2GB file and takes >2GB of disk space
395 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000396 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware1f702212013-12-10 14:09:20 -0600397 support.requires(
398 'largefile',
399 'test requires %s bytes and a long time to run' % self.LARGE)
Antoine Pitrou19690592009-06-12 20:14:08 +0000400 with self.open(support.TESTFN, "w+b", 0) as f:
401 self.large_file_ops(f)
402 with self.open(support.TESTFN, "w+b") as f:
403 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000404
405 def test_with_open(self):
406 for bufsize in (0, 1, 100):
407 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000408 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000409 f.write(b"xxx")
410 self.assertEqual(f.closed, True)
411 f = None
412 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000413 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000414 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000415 except ZeroDivisionError:
416 self.assertEqual(f.closed, True)
417 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000418 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000419
Antoine Pitroue741cc62009-01-21 00:45:36 +0000420 # issue 5008
421 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000422 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000423 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000424 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000425 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000426 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000427 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000428 with self.open(support.TESTFN, "a") as f:
Serhiy Storchakaea4d2872015-08-02 15:19:04 +0300429 self.assertGreater(f.tell(), 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000430
Christian Heimes1a6387e2008-03-26 12:49:49 +0000431 def test_destructor(self):
432 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000433 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000434 def __del__(self):
435 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000436 try:
437 f = super(MyFileIO, self).__del__
438 except AttributeError:
439 pass
440 else:
441 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000442 def close(self):
443 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000444 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000445 def flush(self):
446 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000447 super(MyFileIO, self).flush()
448 f = MyFileIO(support.TESTFN, "wb")
449 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000450 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000451 support.gc_collect()
452 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000453 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000454 self.assertEqual(f.read(), b"xxx")
455
456 def _check_base_destructor(self, base):
457 record = []
458 class MyIO(base):
459 def __init__(self):
460 # This exercises the availability of attributes on object
461 # destruction.
462 # (in the C version, close() is called by the tp_dealloc
463 # function, not by __del__)
464 self.on_del = 1
465 self.on_close = 2
466 self.on_flush = 3
467 def __del__(self):
468 record.append(self.on_del)
469 try:
470 f = super(MyIO, self).__del__
471 except AttributeError:
472 pass
473 else:
474 f()
475 def close(self):
476 record.append(self.on_close)
477 super(MyIO, self).close()
478 def flush(self):
479 record.append(self.on_flush)
480 super(MyIO, self).flush()
481 f = MyIO()
482 del f
483 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000484 self.assertEqual(record, [1, 2, 3])
485
Antoine Pitrou19690592009-06-12 20:14:08 +0000486 def test_IOBase_destructor(self):
487 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000488
Antoine Pitrou19690592009-06-12 20:14:08 +0000489 def test_RawIOBase_destructor(self):
490 self._check_base_destructor(self.RawIOBase)
491
492 def test_BufferedIOBase_destructor(self):
493 self._check_base_destructor(self.BufferedIOBase)
494
495 def test_TextIOBase_destructor(self):
496 self._check_base_destructor(self.TextIOBase)
497
498 def test_close_flushes(self):
499 with self.open(support.TESTFN, "wb") as f:
500 f.write(b"xxx")
501 with self.open(support.TESTFN, "rb") as f:
502 self.assertEqual(f.read(), b"xxx")
503
504 def test_array_writes(self):
505 a = array.array(b'i', range(10))
506 n = len(a.tostring())
507 with self.open(support.TESTFN, "wb", 0) as f:
508 self.assertEqual(f.write(a), n)
509 with self.open(support.TESTFN, "wb") as f:
510 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000511
512 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000513 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000514 closefd=False)
515
Antoine Pitrou19690592009-06-12 20:14:08 +0000516 def test_read_closed(self):
517 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000518 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000519 with self.open(support.TESTFN, "r") as f:
520 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000521 self.assertEqual(file.read(), "egg\n")
522 file.seek(0)
523 file.close()
524 self.assertRaises(ValueError, file.read)
525
526 def test_no_closefd_with_filename(self):
527 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000528 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000529
530 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000531 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000532 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000533 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000534 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000535 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000536 self.assertEqual(file.buffer.raw.closefd, False)
537
Antoine Pitrou19690592009-06-12 20:14:08 +0000538 def test_garbage_collection(self):
539 # FileIO objects are collected, and collecting them flushes
540 # all data to disk.
541 f = self.FileIO(support.TESTFN, "wb")
542 f.write(b"abcxxx")
543 f.f = f
544 wr = weakref.ref(f)
545 del f
546 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +0300547 self.assertIsNone(wr(), wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000548 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000549 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000550
Antoine Pitrou19690592009-06-12 20:14:08 +0000551 def test_unbounded_file(self):
552 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
553 zero = "/dev/zero"
554 if not os.path.exists(zero):
555 self.skipTest("{0} does not exist".format(zero))
556 if sys.maxsize > 0x7FFFFFFF:
557 self.skipTest("test can only run in a 32-bit address space")
558 if support.real_max_memuse < support._2G:
559 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000560 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000561 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000562 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000563 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000564 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000565 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000566
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200567 def check_flush_error_on_close(self, *args, **kwargs):
568 # Test that the file is closed despite failed flush
569 # and that flush() is called before file closed.
570 f = self.open(*args, **kwargs)
571 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000572 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200573 closed[:] = [f.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000574 raise IOError()
575 f.flush = bad_flush
576 self.assertRaises(IOError, f.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600577 self.assertTrue(f.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200578 self.assertTrue(closed) # flush() called
579 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchaka437d5352015-02-23 00:28:38 +0200580 f.flush = lambda: None # break reference loop
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200581
582 def test_flush_error_on_close(self):
583 # raw file
584 # Issue #5700: io.FileIO calls flush() after file closed
585 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
586 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
587 self.check_flush_error_on_close(fd, 'wb', buffering=0)
588 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
589 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
590 os.close(fd)
591 # buffered io
592 self.check_flush_error_on_close(support.TESTFN, 'wb')
593 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
594 self.check_flush_error_on_close(fd, 'wb')
595 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
596 self.check_flush_error_on_close(fd, 'wb', closefd=False)
597 os.close(fd)
598 # text io
599 self.check_flush_error_on_close(support.TESTFN, 'w')
600 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
601 self.check_flush_error_on_close(fd, 'w')
602 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
603 self.check_flush_error_on_close(fd, 'w', closefd=False)
604 os.close(fd)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000605
606 def test_multi_close(self):
607 f = self.open(support.TESTFN, "wb", buffering=0)
608 f.close()
609 f.close()
610 f.close()
611 self.assertRaises(ValueError, f.flush)
612
Antoine Pitrou6391b342010-09-14 18:48:19 +0000613 def test_RawIOBase_read(self):
614 # Exercise the default RawIOBase.read() implementation (which calls
615 # readinto() internally).
616 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
617 self.assertEqual(rawio.read(2), b"ab")
618 self.assertEqual(rawio.read(2), b"c")
619 self.assertEqual(rawio.read(2), b"d")
620 self.assertEqual(rawio.read(2), None)
621 self.assertEqual(rawio.read(2), b"ef")
622 self.assertEqual(rawio.read(2), b"g")
623 self.assertEqual(rawio.read(2), None)
624 self.assertEqual(rawio.read(2), b"")
625
Hynek Schlawack877effc2012-05-25 09:24:18 +0200626 def test_fileio_closefd(self):
627 # Issue #4841
628 with self.open(__file__, 'rb') as f1, \
629 self.open(__file__, 'rb') as f2:
630 fileio = self.FileIO(f1.fileno(), closefd=False)
631 # .__init__() must not close f1
632 fileio.__init__(f2.fileno(), closefd=False)
633 f1.readline()
634 # .close() must not close f2
635 fileio.close()
636 f2.readline()
637
Serhiy Storchaka05b0a1b2014-06-09 13:32:08 +0300638 def test_nonbuffered_textio(self):
639 with warnings.catch_warnings(record=True) as recorded:
640 with self.assertRaises(ValueError):
641 self.open(support.TESTFN, 'w', buffering=0)
642 support.gc_collect()
643 self.assertEqual(recorded, [])
644
645 def test_invalid_newline(self):
646 with warnings.catch_warnings(record=True) as recorded:
647 with self.assertRaises(ValueError):
648 self.open(support.TESTFN, 'w', newline='invalid')
649 support.gc_collect()
650 self.assertEqual(recorded, [])
651
Hynek Schlawack877effc2012-05-25 09:24:18 +0200652
Antoine Pitrou19690592009-06-12 20:14:08 +0000653class CIOTest(IOTest):
Antoine Pitrou16166452011-07-12 22:04:20 +0200654
655 def test_IOBase_finalize(self):
656 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
657 # class which inherits IOBase and an object of this class are caught
658 # in a reference cycle and close() is already in the method cache.
659 class MyIO(self.IOBase):
660 def close(self):
661 pass
662
663 # create an instance to populate the method cache
664 MyIO()
665 obj = MyIO()
666 obj.obj = obj
667 wr = weakref.ref(obj)
668 del MyIO
669 del obj
670 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +0300671 self.assertIsNone(wr(), wr)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000672
Antoine Pitrou19690592009-06-12 20:14:08 +0000673class PyIOTest(IOTest):
674 test_array_writes = unittest.skip(
675 "len(array.array) returns number of elements rather than bytelength"
676 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000677
678
Antoine Pitrou19690592009-06-12 20:14:08 +0000679class CommonBufferedTests:
680 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
681
682 def test_detach(self):
683 raw = self.MockRawIO()
684 buf = self.tp(raw)
685 self.assertIs(buf.detach(), raw)
686 self.assertRaises(ValueError, buf.detach)
687
Benjamin Peterson53ae6142014-12-21 20:51:50 -0600688 repr(buf) # Should still work
689
Antoine Pitrou19690592009-06-12 20:14:08 +0000690 def test_fileno(self):
691 rawio = self.MockRawIO()
692 bufio = self.tp(rawio)
693
Ezio Melotti2623a372010-11-21 13:34:58 +0000694 self.assertEqual(42, bufio.fileno())
Antoine Pitrou19690592009-06-12 20:14:08 +0000695
Antoine Pitrou19690592009-06-12 20:14:08 +0000696 def test_invalid_args(self):
697 rawio = self.MockRawIO()
698 bufio = self.tp(rawio)
699 # Invalid whence
700 self.assertRaises(ValueError, bufio.seek, 0, -1)
701 self.assertRaises(ValueError, bufio.seek, 0, 3)
702
703 def test_override_destructor(self):
704 tp = self.tp
705 record = []
706 class MyBufferedIO(tp):
707 def __del__(self):
708 record.append(1)
709 try:
710 f = super(MyBufferedIO, self).__del__
711 except AttributeError:
712 pass
713 else:
714 f()
715 def close(self):
716 record.append(2)
717 super(MyBufferedIO, self).close()
718 def flush(self):
719 record.append(3)
720 super(MyBufferedIO, self).flush()
721 rawio = self.MockRawIO()
722 bufio = MyBufferedIO(rawio)
723 writable = bufio.writable()
724 del bufio
725 support.gc_collect()
726 if writable:
727 self.assertEqual(record, [1, 2, 3])
728 else:
729 self.assertEqual(record, [1, 2])
730
731 def test_context_manager(self):
732 # Test usability as a context manager
733 rawio = self.MockRawIO()
734 bufio = self.tp(rawio)
735 def _with():
736 with bufio:
737 pass
738 _with()
739 # bufio should now be closed, and using it a second time should raise
740 # a ValueError.
741 self.assertRaises(ValueError, _with)
742
743 def test_error_through_destructor(self):
744 # Test that the exception state is not modified by a destructor,
745 # even if close() fails.
746 rawio = self.CloseFailureIO()
747 def f():
748 self.tp(rawio).xyzzy
749 with support.captured_output("stderr") as s:
750 self.assertRaises(AttributeError, f)
751 s = s.getvalue().strip()
752 if s:
753 # The destructor *may* have printed an unraisable error, check it
754 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000755 self.assertTrue(s.startswith("Exception IOError: "), s)
756 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000757
758 def test_repr(self):
759 raw = self.MockRawIO()
760 b = self.tp(raw)
761 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
762 self.assertEqual(repr(b), "<%s>" % clsname)
763 raw.name = "dummy"
764 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
765 raw.name = b"dummy"
766 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000767
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000768 def test_flush_error_on_close(self):
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200769 # Test that buffered file is closed despite failed flush
770 # and that flush() is called before file closed.
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000771 raw = self.MockRawIO()
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200772 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000773 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200774 closed[:] = [b.closed, raw.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000775 raise IOError()
776 raw.flush = bad_flush
777 b = self.tp(raw)
778 self.assertRaises(IOError, b.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600779 self.assertTrue(b.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200780 self.assertTrue(raw.closed)
781 self.assertTrue(closed) # flush() called
782 self.assertFalse(closed[0]) # flush() called before file closed
783 self.assertFalse(closed[1])
Serhiy Storchaka437d5352015-02-23 00:28:38 +0200784 raw.flush = lambda: None # break reference loop
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600785
786 def test_close_error_on_close(self):
787 raw = self.MockRawIO()
788 def bad_flush():
789 raise IOError('flush')
790 def bad_close():
791 raise IOError('close')
792 raw.close = bad_close
793 b = self.tp(raw)
794 b.flush = bad_flush
795 with self.assertRaises(IOError) as err: # exception not swallowed
796 b.close()
797 self.assertEqual(err.exception.args, ('close',))
798 self.assertFalse(b.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000799
800 def test_multi_close(self):
801 raw = self.MockRawIO()
802 b = self.tp(raw)
803 b.close()
804 b.close()
805 b.close()
806 self.assertRaises(ValueError, b.flush)
807
Antoine Pitroufc9ead62010-12-21 21:26:55 +0000808 def test_readonly_attributes(self):
809 raw = self.MockRawIO()
810 buf = self.tp(raw)
811 x = self.MockRawIO()
812 with self.assertRaises((AttributeError, TypeError)):
813 buf.raw = x
814
Christian Heimes1a6387e2008-03-26 12:49:49 +0000815
Antoine Pitroubff5df02012-07-29 19:02:46 +0200816class SizeofTest:
817
818 @support.cpython_only
819 def test_sizeof(self):
820 bufsize1 = 4096
821 bufsize2 = 8192
822 rawio = self.MockRawIO()
823 bufio = self.tp(rawio, buffer_size=bufsize1)
824 size = sys.getsizeof(bufio) - bufsize1
825 rawio = self.MockRawIO()
826 bufio = self.tp(rawio, buffer_size=bufsize2)
827 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
828
829
Antoine Pitrou19690592009-06-12 20:14:08 +0000830class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
831 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000832
Antoine Pitrou19690592009-06-12 20:14:08 +0000833 def test_constructor(self):
834 rawio = self.MockRawIO([b"abc"])
835 bufio = self.tp(rawio)
836 bufio.__init__(rawio)
837 bufio.__init__(rawio, buffer_size=1024)
838 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000839 self.assertEqual(b"abc", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000840 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
841 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
842 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
843 rawio = self.MockRawIO([b"abc"])
844 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000845 self.assertEqual(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000846
Serhiy Storchaka1d19f972014-02-12 10:52:07 +0200847 def test_uninitialized(self):
848 bufio = self.tp.__new__(self.tp)
849 del bufio
850 bufio = self.tp.__new__(self.tp)
851 self.assertRaisesRegexp((ValueError, AttributeError),
852 'uninitialized|has no attribute',
853 bufio.read, 0)
854 bufio.__init__(self.MockRawIO())
855 self.assertEqual(bufio.read(0), b'')
856
Antoine Pitrou19690592009-06-12 20:14:08 +0000857 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000858 for arg in (None, 7):
859 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
860 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000861 self.assertEqual(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000862 # Invalid args
863 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000864
Antoine Pitrou19690592009-06-12 20:14:08 +0000865 def test_read1(self):
866 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
867 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000868 self.assertEqual(b"a", bufio.read(1))
869 self.assertEqual(b"b", bufio.read1(1))
870 self.assertEqual(rawio._reads, 1)
871 self.assertEqual(b"c", bufio.read1(100))
872 self.assertEqual(rawio._reads, 1)
873 self.assertEqual(b"d", bufio.read1(100))
874 self.assertEqual(rawio._reads, 2)
875 self.assertEqual(b"efg", bufio.read1(100))
876 self.assertEqual(rawio._reads, 3)
877 self.assertEqual(b"", bufio.read1(100))
878 self.assertEqual(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000879 # Invalid args
880 self.assertRaises(ValueError, bufio.read1, -1)
881
882 def test_readinto(self):
883 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
884 bufio = self.tp(rawio)
885 b = bytearray(2)
Ezio Melotti2623a372010-11-21 13:34:58 +0000886 self.assertEqual(bufio.readinto(b), 2)
887 self.assertEqual(b, b"ab")
888 self.assertEqual(bufio.readinto(b), 2)
889 self.assertEqual(b, b"cd")
890 self.assertEqual(bufio.readinto(b), 2)
891 self.assertEqual(b, b"ef")
892 self.assertEqual(bufio.readinto(b), 1)
893 self.assertEqual(b, b"gf")
894 self.assertEqual(bufio.readinto(b), 0)
895 self.assertEqual(b, b"gf")
Antoine Pitrou19690592009-06-12 20:14:08 +0000896
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000897 def test_readlines(self):
898 def bufio():
899 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
900 return self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000901 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
902 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
903 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000904
Antoine Pitrou19690592009-06-12 20:14:08 +0000905 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000906 data = b"abcdefghi"
907 dlen = len(data)
908
909 tests = [
910 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
911 [ 100, [ 3, 3, 3], [ dlen ] ],
912 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
913 ]
914
915 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000916 rawio = self.MockFileIO(data)
917 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000918 pos = 0
919 for nbytes in buf_read_sizes:
Ezio Melotti2623a372010-11-21 13:34:58 +0000920 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Christian Heimes1a6387e2008-03-26 12:49:49 +0000921 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000922 # this is mildly implementation-dependent
Ezio Melotti2623a372010-11-21 13:34:58 +0000923 self.assertEqual(rawio.read_history, raw_read_sizes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000924
Antoine Pitrou19690592009-06-12 20:14:08 +0000925 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000926 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000927 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
928 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000929 self.assertEqual(b"abcd", bufio.read(6))
930 self.assertEqual(b"e", bufio.read(1))
931 self.assertEqual(b"fg", bufio.read())
932 self.assertEqual(b"", bufio.peek(1))
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200933 self.assertIsNone(bufio.read())
Ezio Melotti2623a372010-11-21 13:34:58 +0000934 self.assertEqual(b"", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000935
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200936 rawio = self.MockRawIO((b"a", None, None))
937 self.assertEqual(b"a", rawio.readall())
938 self.assertIsNone(rawio.readall())
939
Antoine Pitrou19690592009-06-12 20:14:08 +0000940 def test_read_past_eof(self):
941 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
942 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000943
Ezio Melotti2623a372010-11-21 13:34:58 +0000944 self.assertEqual(b"abcdefg", bufio.read(9000))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000945
Antoine Pitrou19690592009-06-12 20:14:08 +0000946 def test_read_all(self):
947 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
948 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000949
Ezio Melotti2623a372010-11-21 13:34:58 +0000950 self.assertEqual(b"abcdefg", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000951
Victor Stinner6a102812010-04-27 23:55:59 +0000952 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +0000953 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +0000954 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000955 try:
956 # Write out many bytes with exactly the same number of 0's,
957 # 1's... 255's. This will help us check that concurrent reading
958 # doesn't duplicate or forget contents.
959 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000960 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000961 random.shuffle(l)
962 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000963 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000964 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000965 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000966 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000967 errors = []
968 results = []
969 def f():
970 try:
971 # Intra-buffer read then buffer-flushing read
972 for n in cycle([1, 19]):
973 s = bufio.read(n)
974 if not s:
975 break
976 # list.append() is atomic
977 results.append(s)
978 except Exception as e:
979 errors.append(e)
980 raise
981 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchakabd8c6292015-04-01 12:56:39 +0300982 with support.start_threads(threads):
983 time.sleep(0.02) # yield
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000984 self.assertFalse(errors,
985 "the following exceptions were caught: %r" % errors)
986 s = b''.join(results)
987 for i in range(256):
988 c = bytes(bytearray([i]))
989 self.assertEqual(s.count(c), N)
990 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000991 support.unlink(support.TESTFN)
992
993 def test_misbehaved_io(self):
994 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
995 bufio = self.tp(rawio)
996 self.assertRaises(IOError, bufio.seek, 0)
997 self.assertRaises(IOError, bufio.tell)
998
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000999 def test_no_extraneous_read(self):
1000 # Issue #9550; when the raw IO object has satisfied the read request,
1001 # we should not issue any additional reads, otherwise it may block
1002 # (e.g. socket).
1003 bufsize = 16
1004 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1005 rawio = self.MockRawIO([b"x" * n])
1006 bufio = self.tp(rawio, bufsize)
1007 self.assertEqual(bufio.read(n), b"x" * n)
1008 # Simple case: one raw read is enough to satisfy the request.
1009 self.assertEqual(rawio._extraneous_reads, 0,
1010 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1011 # A more complex case where two raw reads are needed to satisfy
1012 # the request.
1013 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1014 bufio = self.tp(rawio, bufsize)
1015 self.assertEqual(bufio.read(n), b"x" * n)
1016 self.assertEqual(rawio._extraneous_reads, 0,
1017 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1018
1019
Antoine Pitroubff5df02012-07-29 19:02:46 +02001020class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001021 tp = io.BufferedReader
1022
1023 def test_constructor(self):
1024 BufferedReaderTest.test_constructor(self)
1025 # The allocation can succeed on 32-bit builds, e.g. with more
1026 # than 2GB RAM and a 64-bit kernel.
1027 if sys.maxsize > 0x7FFFFFFF:
1028 rawio = self.MockRawIO()
1029 bufio = self.tp(rawio)
1030 self.assertRaises((OverflowError, MemoryError, ValueError),
1031 bufio.__init__, rawio, sys.maxsize)
1032
1033 def test_initialization(self):
1034 rawio = self.MockRawIO([b"abc"])
1035 bufio = self.tp(rawio)
1036 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1037 self.assertRaises(ValueError, bufio.read)
1038 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1039 self.assertRaises(ValueError, bufio.read)
1040 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1041 self.assertRaises(ValueError, bufio.read)
1042
1043 def test_misbehaved_io_read(self):
1044 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1045 bufio = self.tp(rawio)
1046 # _pyio.BufferedReader seems to implement reading different, so that
1047 # checking this is not so easy.
1048 self.assertRaises(IOError, bufio.read, 10)
1049
1050 def test_garbage_collection(self):
1051 # C BufferedReader objects are collected.
1052 # The Python version has __del__, so it ends into gc.garbage instead
1053 rawio = self.FileIO(support.TESTFN, "w+b")
1054 f = self.tp(rawio)
1055 f.f = f
1056 wr = weakref.ref(f)
1057 del f
1058 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03001059 self.assertIsNone(wr(), wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00001060
R David Murray5b2cf5e2013-02-23 22:11:21 -05001061 def test_args_error(self):
1062 # Issue #17275
1063 with self.assertRaisesRegexp(TypeError, "BufferedReader"):
1064 self.tp(io.BytesIO(), 1024, 1024, 1024)
1065
1066
Antoine Pitrou19690592009-06-12 20:14:08 +00001067class PyBufferedReaderTest(BufferedReaderTest):
1068 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001069
1070
Antoine Pitrou19690592009-06-12 20:14:08 +00001071class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1072 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001073
Antoine Pitrou19690592009-06-12 20:14:08 +00001074 def test_constructor(self):
1075 rawio = self.MockRawIO()
1076 bufio = self.tp(rawio)
1077 bufio.__init__(rawio)
1078 bufio.__init__(rawio, buffer_size=1024)
1079 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +00001080 self.assertEqual(3, bufio.write(b"abc"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001081 bufio.flush()
1082 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1083 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1084 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1085 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +00001086 self.assertEqual(3, bufio.write(b"ghi"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001087 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001088 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001089
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001090 def test_uninitialized(self):
1091 bufio = self.tp.__new__(self.tp)
1092 del bufio
1093 bufio = self.tp.__new__(self.tp)
1094 self.assertRaisesRegexp((ValueError, AttributeError),
1095 'uninitialized|has no attribute',
1096 bufio.write, b'')
1097 bufio.__init__(self.MockRawIO())
1098 self.assertEqual(bufio.write(b''), 0)
1099
Antoine Pitrou19690592009-06-12 20:14:08 +00001100 def test_detach_flush(self):
1101 raw = self.MockRawIO()
1102 buf = self.tp(raw)
1103 buf.write(b"howdy!")
1104 self.assertFalse(raw._write_stack)
1105 buf.detach()
1106 self.assertEqual(raw._write_stack, [b"howdy!"])
1107
1108 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001109 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +00001110 writer = self.MockRawIO()
1111 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001112 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001113 self.assertFalse(writer._write_stack)
1114
Antoine Pitrou19690592009-06-12 20:14:08 +00001115 def test_write_overflow(self):
1116 writer = self.MockRawIO()
1117 bufio = self.tp(writer, 8)
1118 contents = b"abcdefghijklmnop"
1119 for n in range(0, len(contents), 3):
1120 bufio.write(contents[n:n+3])
1121 flushed = b"".join(writer._write_stack)
1122 # At least (total - 8) bytes were implicitly flushed, perhaps more
1123 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001124 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001125
Antoine Pitrou19690592009-06-12 20:14:08 +00001126 def check_writes(self, intermediate_func):
1127 # Lots of writes, test the flushed output is as expected.
1128 contents = bytes(range(256)) * 1000
1129 n = 0
1130 writer = self.MockRawIO()
1131 bufio = self.tp(writer, 13)
1132 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1133 def gen_sizes():
1134 for size in count(1):
1135 for i in range(15):
1136 yield size
1137 sizes = gen_sizes()
1138 while n < len(contents):
1139 size = min(next(sizes), len(contents) - n)
Ezio Melotti2623a372010-11-21 13:34:58 +00001140 self.assertEqual(bufio.write(contents[n:n+size]), size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001141 intermediate_func(bufio)
1142 n += size
1143 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001144 self.assertEqual(contents,
Antoine Pitrou19690592009-06-12 20:14:08 +00001145 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001146
Antoine Pitrou19690592009-06-12 20:14:08 +00001147 def test_writes(self):
1148 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001149
Antoine Pitrou19690592009-06-12 20:14:08 +00001150 def test_writes_and_flushes(self):
1151 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001152
Antoine Pitrou19690592009-06-12 20:14:08 +00001153 def test_writes_and_seeks(self):
1154 def _seekabs(bufio):
1155 pos = bufio.tell()
1156 bufio.seek(pos + 1, 0)
1157 bufio.seek(pos - 1, 0)
1158 bufio.seek(pos, 0)
1159 self.check_writes(_seekabs)
1160 def _seekrel(bufio):
1161 pos = bufio.seek(0, 1)
1162 bufio.seek(+1, 1)
1163 bufio.seek(-1, 1)
1164 bufio.seek(pos, 0)
1165 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001166
Antoine Pitrou19690592009-06-12 20:14:08 +00001167 def test_writes_and_truncates(self):
1168 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001169
Antoine Pitrou19690592009-06-12 20:14:08 +00001170 def test_write_non_blocking(self):
1171 raw = self.MockNonBlockWriterIO()
1172 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001173
Ezio Melotti2623a372010-11-21 13:34:58 +00001174 self.assertEqual(bufio.write(b"abcd"), 4)
1175 self.assertEqual(bufio.write(b"efghi"), 5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001176 # 1 byte will be written, the rest will be buffered
1177 raw.block_on(b"k")
Ezio Melotti2623a372010-11-21 13:34:58 +00001178 self.assertEqual(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001179
Antoine Pitrou19690592009-06-12 20:14:08 +00001180 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1181 raw.block_on(b"0")
1182 try:
1183 bufio.write(b"opqrwxyz0123456789")
1184 except self.BlockingIOError as e:
1185 written = e.characters_written
1186 else:
1187 self.fail("BlockingIOError should have been raised")
Ezio Melotti2623a372010-11-21 13:34:58 +00001188 self.assertEqual(written, 16)
1189 self.assertEqual(raw.pop_written(),
Antoine Pitrou19690592009-06-12 20:14:08 +00001190 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001191
Ezio Melotti2623a372010-11-21 13:34:58 +00001192 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Antoine Pitrou19690592009-06-12 20:14:08 +00001193 s = raw.pop_written()
1194 # Previously buffered bytes were flushed
1195 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001196
Antoine Pitrou19690592009-06-12 20:14:08 +00001197 def test_write_and_rewind(self):
1198 raw = io.BytesIO()
1199 bufio = self.tp(raw, 4)
1200 self.assertEqual(bufio.write(b"abcdef"), 6)
1201 self.assertEqual(bufio.tell(), 6)
1202 bufio.seek(0, 0)
1203 self.assertEqual(bufio.write(b"XY"), 2)
1204 bufio.seek(6, 0)
1205 self.assertEqual(raw.getvalue(), b"XYcdef")
1206 self.assertEqual(bufio.write(b"123456"), 6)
1207 bufio.flush()
1208 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001209
Antoine Pitrou19690592009-06-12 20:14:08 +00001210 def test_flush(self):
1211 writer = self.MockRawIO()
1212 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001213 bufio.write(b"abc")
1214 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001215 self.assertEqual(b"abc", writer._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001216
Antoine Pitrou78e761e2012-10-16 22:57:11 +02001217 def test_writelines(self):
1218 l = [b'ab', b'cd', b'ef']
1219 writer = self.MockRawIO()
1220 bufio = self.tp(writer, 8)
1221 bufio.writelines(l)
1222 bufio.flush()
1223 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1224
1225 def test_writelines_userlist(self):
1226 l = UserList([b'ab', b'cd', b'ef'])
1227 writer = self.MockRawIO()
1228 bufio = self.tp(writer, 8)
1229 bufio.writelines(l)
1230 bufio.flush()
1231 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1232
1233 def test_writelines_error(self):
1234 writer = self.MockRawIO()
1235 bufio = self.tp(writer, 8)
1236 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1237 self.assertRaises(TypeError, bufio.writelines, None)
1238
Antoine Pitrou19690592009-06-12 20:14:08 +00001239 def test_destructor(self):
1240 writer = self.MockRawIO()
1241 bufio = self.tp(writer, 8)
1242 bufio.write(b"abc")
1243 del bufio
1244 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001245 self.assertEqual(b"abc", writer._write_stack[0])
Antoine Pitrou19690592009-06-12 20:14:08 +00001246
1247 def test_truncate(self):
1248 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001249 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001250 bufio = self.tp(raw, 8)
1251 bufio.write(b"abcdef")
1252 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001253 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001254 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001255 self.assertEqual(f.read(), b"abc")
1256
Victor Stinner6a102812010-04-27 23:55:59 +00001257 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001258 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001259 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001260 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001261 # Write out many bytes from many threads and test they were
1262 # all flushed.
1263 N = 1000
1264 contents = bytes(range(256)) * N
1265 sizes = cycle([1, 19])
1266 n = 0
1267 queue = deque()
1268 while n < len(contents):
1269 size = next(sizes)
1270 queue.append(contents[n:n+size])
1271 n += size
1272 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001273 # We use a real file object because it allows us to
1274 # exercise situations where the GIL is released before
1275 # writing the buffer to the raw streams. This is in addition
1276 # to concurrency issues due to switching threads in the middle
1277 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001278 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001279 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001280 errors = []
1281 def f():
1282 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001283 while True:
1284 try:
1285 s = queue.popleft()
1286 except IndexError:
1287 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001288 bufio.write(s)
1289 except Exception as e:
1290 errors.append(e)
1291 raise
1292 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03001293 with support.start_threads(threads):
1294 time.sleep(0.02) # yield
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001295 self.assertFalse(errors,
1296 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001297 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001298 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001299 s = f.read()
1300 for i in range(256):
Ezio Melotti2623a372010-11-21 13:34:58 +00001301 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001302 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001303 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001304
Antoine Pitrou19690592009-06-12 20:14:08 +00001305 def test_misbehaved_io(self):
1306 rawio = self.MisbehavedRawIO()
1307 bufio = self.tp(rawio, 5)
1308 self.assertRaises(IOError, bufio.seek, 0)
1309 self.assertRaises(IOError, bufio.tell)
1310 self.assertRaises(IOError, bufio.write, b"abcdef")
1311
1312 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001313 with support.check_warnings(("max_buffer_size is deprecated",
1314 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001315 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001316
Benjamin Petersona2d6d712012-12-20 12:24:10 -06001317 def test_write_error_on_close(self):
1318 raw = self.MockRawIO()
1319 def bad_write(b):
1320 raise IOError()
1321 raw.write = bad_write
1322 b = self.tp(raw)
1323 b.write(b'spam')
1324 self.assertRaises(IOError, b.close) # exception not swallowed
1325 self.assertTrue(b.closed)
1326
Antoine Pitrou19690592009-06-12 20:14:08 +00001327
Antoine Pitroubff5df02012-07-29 19:02:46 +02001328class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001329 tp = io.BufferedWriter
1330
1331 def test_constructor(self):
1332 BufferedWriterTest.test_constructor(self)
1333 # The allocation can succeed on 32-bit builds, e.g. with more
1334 # than 2GB RAM and a 64-bit kernel.
1335 if sys.maxsize > 0x7FFFFFFF:
1336 rawio = self.MockRawIO()
1337 bufio = self.tp(rawio)
1338 self.assertRaises((OverflowError, MemoryError, ValueError),
1339 bufio.__init__, rawio, sys.maxsize)
1340
1341 def test_initialization(self):
1342 rawio = self.MockRawIO()
1343 bufio = self.tp(rawio)
1344 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1345 self.assertRaises(ValueError, bufio.write, b"def")
1346 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1347 self.assertRaises(ValueError, bufio.write, b"def")
1348 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1349 self.assertRaises(ValueError, bufio.write, b"def")
1350
1351 def test_garbage_collection(self):
1352 # C BufferedWriter objects are collected, and collecting them flushes
1353 # all data to disk.
1354 # The Python version has __del__, so it ends into gc.garbage instead
1355 rawio = self.FileIO(support.TESTFN, "w+b")
1356 f = self.tp(rawio)
1357 f.write(b"123xxx")
1358 f.x = f
1359 wr = weakref.ref(f)
1360 del f
1361 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03001362 self.assertIsNone(wr(), wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001363 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001364 self.assertEqual(f.read(), b"123xxx")
1365
R David Murray5b2cf5e2013-02-23 22:11:21 -05001366 def test_args_error(self):
1367 # Issue #17275
1368 with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
1369 self.tp(io.BytesIO(), 1024, 1024, 1024)
1370
Antoine Pitrou19690592009-06-12 20:14:08 +00001371
1372class PyBufferedWriterTest(BufferedWriterTest):
1373 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001374
1375class BufferedRWPairTest(unittest.TestCase):
1376
Antoine Pitrou19690592009-06-12 20:14:08 +00001377 def test_constructor(self):
1378 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001379 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001380
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001381 def test_uninitialized(self):
1382 pair = self.tp.__new__(self.tp)
1383 del pair
1384 pair = self.tp.__new__(self.tp)
1385 self.assertRaisesRegexp((ValueError, AttributeError),
1386 'uninitialized|has no attribute',
1387 pair.read, 0)
1388 self.assertRaisesRegexp((ValueError, AttributeError),
1389 'uninitialized|has no attribute',
1390 pair.write, b'')
1391 pair.__init__(self.MockRawIO(), self.MockRawIO())
1392 self.assertEqual(pair.read(0), b'')
1393 self.assertEqual(pair.write(b''), 0)
1394
Antoine Pitrou19690592009-06-12 20:14:08 +00001395 def test_detach(self):
1396 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1397 self.assertRaises(self.UnsupportedOperation, pair.detach)
1398
1399 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001400 with support.check_warnings(("max_buffer_size is deprecated",
1401 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001402 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001403
1404 def test_constructor_with_not_readable(self):
1405 class NotReadable(MockRawIO):
1406 def readable(self):
1407 return False
1408
1409 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1410
1411 def test_constructor_with_not_writeable(self):
1412 class NotWriteable(MockRawIO):
1413 def writable(self):
1414 return False
1415
1416 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1417
1418 def test_read(self):
1419 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1420
1421 self.assertEqual(pair.read(3), b"abc")
1422 self.assertEqual(pair.read(1), b"d")
1423 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001424 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1425 self.assertEqual(pair.read(None), b"abc")
1426
1427 def test_readlines(self):
1428 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1429 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1430 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1431 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001432
1433 def test_read1(self):
1434 # .read1() is delegated to the underlying reader object, so this test
1435 # can be shallow.
1436 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1437
1438 self.assertEqual(pair.read1(3), b"abc")
1439
1440 def test_readinto(self):
1441 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1442
1443 data = bytearray(5)
1444 self.assertEqual(pair.readinto(data), 5)
1445 self.assertEqual(data, b"abcde")
1446
1447 def test_write(self):
1448 w = self.MockRawIO()
1449 pair = self.tp(self.MockRawIO(), w)
1450
1451 pair.write(b"abc")
1452 pair.flush()
1453 pair.write(b"def")
1454 pair.flush()
1455 self.assertEqual(w._write_stack, [b"abc", b"def"])
1456
1457 def test_peek(self):
1458 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1459
1460 self.assertTrue(pair.peek(3).startswith(b"abc"))
1461 self.assertEqual(pair.read(3), b"abc")
1462
1463 def test_readable(self):
1464 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1465 self.assertTrue(pair.readable())
1466
1467 def test_writeable(self):
1468 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1469 self.assertTrue(pair.writable())
1470
1471 def test_seekable(self):
1472 # BufferedRWPairs are never seekable, even if their readers and writers
1473 # are.
1474 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1475 self.assertFalse(pair.seekable())
1476
1477 # .flush() is delegated to the underlying writer object and has been
1478 # tested in the test_write method.
1479
1480 def test_close_and_closed(self):
1481 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1482 self.assertFalse(pair.closed)
1483 pair.close()
1484 self.assertTrue(pair.closed)
1485
Serhiy Storchakaf95a57f2015-03-24 23:23:42 +02001486 def test_reader_close_error_on_close(self):
1487 def reader_close():
1488 reader_non_existing
1489 reader = self.MockRawIO()
1490 reader.close = reader_close
1491 writer = self.MockRawIO()
1492 pair = self.tp(reader, writer)
1493 with self.assertRaises(NameError) as err:
1494 pair.close()
1495 self.assertIn('reader_non_existing', str(err.exception))
1496 self.assertTrue(pair.closed)
1497 self.assertFalse(reader.closed)
1498 self.assertTrue(writer.closed)
1499
1500 def test_writer_close_error_on_close(self):
1501 def writer_close():
1502 writer_non_existing
1503 reader = self.MockRawIO()
1504 writer = self.MockRawIO()
1505 writer.close = writer_close
1506 pair = self.tp(reader, writer)
1507 with self.assertRaises(NameError) as err:
1508 pair.close()
1509 self.assertIn('writer_non_existing', str(err.exception))
1510 self.assertFalse(pair.closed)
1511 self.assertTrue(reader.closed)
1512 self.assertFalse(writer.closed)
1513
1514 def test_reader_writer_close_error_on_close(self):
1515 def reader_close():
1516 reader_non_existing
1517 def writer_close():
1518 writer_non_existing
1519 reader = self.MockRawIO()
1520 reader.close = reader_close
1521 writer = self.MockRawIO()
1522 writer.close = writer_close
1523 pair = self.tp(reader, writer)
1524 with self.assertRaises(NameError) as err:
1525 pair.close()
1526 self.assertIn('reader_non_existing', str(err.exception))
1527 self.assertFalse(pair.closed)
1528 self.assertFalse(reader.closed)
1529 self.assertFalse(writer.closed)
1530
Antoine Pitrou19690592009-06-12 20:14:08 +00001531 def test_isatty(self):
1532 class SelectableIsAtty(MockRawIO):
1533 def __init__(self, isatty):
1534 MockRawIO.__init__(self)
1535 self._isatty = isatty
1536
1537 def isatty(self):
1538 return self._isatty
1539
1540 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1541 self.assertFalse(pair.isatty())
1542
1543 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1544 self.assertTrue(pair.isatty())
1545
1546 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1547 self.assertTrue(pair.isatty())
1548
1549 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1550 self.assertTrue(pair.isatty())
1551
Benjamin Peterson1c873bf2014-09-29 22:46:57 -04001552 def test_weakref_clearing(self):
1553 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1554 ref = weakref.ref(brw)
1555 brw = None
1556 ref = None # Shouldn't segfault.
1557
Antoine Pitrou19690592009-06-12 20:14:08 +00001558class CBufferedRWPairTest(BufferedRWPairTest):
1559 tp = io.BufferedRWPair
1560
1561class PyBufferedRWPairTest(BufferedRWPairTest):
1562 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001563
1564
Antoine Pitrou19690592009-06-12 20:14:08 +00001565class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1566 read_mode = "rb+"
1567 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001568
Antoine Pitrou19690592009-06-12 20:14:08 +00001569 def test_constructor(self):
1570 BufferedReaderTest.test_constructor(self)
1571 BufferedWriterTest.test_constructor(self)
1572
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001573 def test_uninitialized(self):
1574 BufferedReaderTest.test_uninitialized(self)
1575 BufferedWriterTest.test_uninitialized(self)
1576
Antoine Pitrou19690592009-06-12 20:14:08 +00001577 def test_read_and_write(self):
1578 raw = self.MockRawIO((b"asdf", b"ghjk"))
1579 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001580
1581 self.assertEqual(b"as", rw.read(2))
1582 rw.write(b"ddd")
1583 rw.write(b"eee")
1584 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001585 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001586 self.assertEqual(b"dddeee", raw._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001587
Antoine Pitrou19690592009-06-12 20:14:08 +00001588 def test_seek_and_tell(self):
1589 raw = self.BytesIO(b"asdfghjkl")
1590 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001591
Ezio Melotti2623a372010-11-21 13:34:58 +00001592 self.assertEqual(b"as", rw.read(2))
1593 self.assertEqual(2, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001594 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001595 self.assertEqual(b"asdf", rw.read(4))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001596
Antoine Pitrou808cec52011-08-20 15:40:58 +02001597 rw.write(b"123f")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001598 rw.seek(0, 0)
Antoine Pitrou808cec52011-08-20 15:40:58 +02001599 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001600 self.assertEqual(9, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001601 rw.seek(-4, 2)
Ezio Melotti2623a372010-11-21 13:34:58 +00001602 self.assertEqual(5, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001603 rw.seek(2, 1)
Ezio Melotti2623a372010-11-21 13:34:58 +00001604 self.assertEqual(7, rw.tell())
1605 self.assertEqual(b"fl", rw.read(11))
Antoine Pitrou808cec52011-08-20 15:40:58 +02001606 rw.flush()
1607 self.assertEqual(b"asdf123fl", raw.getvalue())
1608
Christian Heimes1a6387e2008-03-26 12:49:49 +00001609 self.assertRaises(TypeError, rw.seek, 0.0)
1610
Antoine Pitrou19690592009-06-12 20:14:08 +00001611 def check_flush_and_read(self, read_func):
1612 raw = self.BytesIO(b"abcdefghi")
1613 bufio = self.tp(raw)
1614
Ezio Melotti2623a372010-11-21 13:34:58 +00001615 self.assertEqual(b"ab", read_func(bufio, 2))
Antoine Pitrou19690592009-06-12 20:14:08 +00001616 bufio.write(b"12")
Ezio Melotti2623a372010-11-21 13:34:58 +00001617 self.assertEqual(b"ef", read_func(bufio, 2))
1618 self.assertEqual(6, bufio.tell())
Antoine Pitrou19690592009-06-12 20:14:08 +00001619 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001620 self.assertEqual(6, bufio.tell())
1621 self.assertEqual(b"ghi", read_func(bufio))
Antoine Pitrou19690592009-06-12 20:14:08 +00001622 raw.seek(0, 0)
1623 raw.write(b"XYZ")
1624 # flush() resets the read buffer
1625 bufio.flush()
1626 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001627 self.assertEqual(b"XYZ", read_func(bufio, 3))
Antoine Pitrou19690592009-06-12 20:14:08 +00001628
1629 def test_flush_and_read(self):
1630 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1631
1632 def test_flush_and_readinto(self):
1633 def _readinto(bufio, n=-1):
1634 b = bytearray(n if n >= 0 else 9999)
1635 n = bufio.readinto(b)
1636 return bytes(b[:n])
1637 self.check_flush_and_read(_readinto)
1638
1639 def test_flush_and_peek(self):
1640 def _peek(bufio, n=-1):
1641 # This relies on the fact that the buffer can contain the whole
1642 # raw stream, otherwise peek() can return less.
1643 b = bufio.peek(n)
1644 if n != -1:
1645 b = b[:n]
1646 bufio.seek(len(b), 1)
1647 return b
1648 self.check_flush_and_read(_peek)
1649
1650 def test_flush_and_write(self):
1651 raw = self.BytesIO(b"abcdefghi")
1652 bufio = self.tp(raw)
1653
1654 bufio.write(b"123")
1655 bufio.flush()
1656 bufio.write(b"45")
1657 bufio.flush()
1658 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001659 self.assertEqual(b"12345fghi", raw.getvalue())
1660 self.assertEqual(b"12345fghi", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +00001661
1662 def test_threads(self):
1663 BufferedReaderTest.test_threads(self)
1664 BufferedWriterTest.test_threads(self)
1665
1666 def test_writes_and_peek(self):
1667 def _peek(bufio):
1668 bufio.peek(1)
1669 self.check_writes(_peek)
1670 def _peek(bufio):
1671 pos = bufio.tell()
1672 bufio.seek(-1, 1)
1673 bufio.peek(1)
1674 bufio.seek(pos, 0)
1675 self.check_writes(_peek)
1676
1677 def test_writes_and_reads(self):
1678 def _read(bufio):
1679 bufio.seek(-1, 1)
1680 bufio.read(1)
1681 self.check_writes(_read)
1682
1683 def test_writes_and_read1s(self):
1684 def _read1(bufio):
1685 bufio.seek(-1, 1)
1686 bufio.read1(1)
1687 self.check_writes(_read1)
1688
1689 def test_writes_and_readintos(self):
1690 def _read(bufio):
1691 bufio.seek(-1, 1)
1692 bufio.readinto(bytearray(1))
1693 self.check_writes(_read)
1694
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001695 def test_write_after_readahead(self):
1696 # Issue #6629: writing after the buffer was filled by readahead should
1697 # first rewind the raw stream.
1698 for overwrite_size in [1, 5]:
1699 raw = self.BytesIO(b"A" * 10)
1700 bufio = self.tp(raw, 4)
1701 # Trigger readahead
1702 self.assertEqual(bufio.read(1), b"A")
1703 self.assertEqual(bufio.tell(), 1)
1704 # Overwriting should rewind the raw stream if it needs so
1705 bufio.write(b"B" * overwrite_size)
1706 self.assertEqual(bufio.tell(), overwrite_size + 1)
1707 # If the write size was smaller than the buffer size, flush() and
1708 # check that rewind happens.
1709 bufio.flush()
1710 self.assertEqual(bufio.tell(), overwrite_size + 1)
1711 s = raw.getvalue()
1712 self.assertEqual(s,
1713 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1714
Antoine Pitrouee46a7b2011-05-13 00:31:52 +02001715 def test_write_rewind_write(self):
1716 # Various combinations of reading / writing / seeking backwards / writing again
1717 def mutate(bufio, pos1, pos2):
1718 assert pos2 >= pos1
1719 # Fill the buffer
1720 bufio.seek(pos1)
1721 bufio.read(pos2 - pos1)
1722 bufio.write(b'\x02')
1723 # This writes earlier than the previous write, but still inside
1724 # the buffer.
1725 bufio.seek(pos1)
1726 bufio.write(b'\x01')
1727
1728 b = b"\x80\x81\x82\x83\x84"
1729 for i in range(0, len(b)):
1730 for j in range(i, len(b)):
1731 raw = self.BytesIO(b)
1732 bufio = self.tp(raw, 100)
1733 mutate(bufio, i, j)
1734 bufio.flush()
1735 expected = bytearray(b)
1736 expected[j] = 2
1737 expected[i] = 1
1738 self.assertEqual(raw.getvalue(), expected,
1739 "failed result for i=%d, j=%d" % (i, j))
1740
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001741 def test_truncate_after_read_or_write(self):
1742 raw = self.BytesIO(b"A" * 10)
1743 bufio = self.tp(raw, 100)
1744 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1745 self.assertEqual(bufio.truncate(), 2)
1746 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1747 self.assertEqual(bufio.truncate(), 4)
1748
Antoine Pitrou19690592009-06-12 20:14:08 +00001749 def test_misbehaved_io(self):
1750 BufferedReaderTest.test_misbehaved_io(self)
1751 BufferedWriterTest.test_misbehaved_io(self)
1752
Antoine Pitrou808cec52011-08-20 15:40:58 +02001753 def test_interleaved_read_write(self):
1754 # Test for issue #12213
1755 with self.BytesIO(b'abcdefgh') as raw:
1756 with self.tp(raw, 100) as f:
1757 f.write(b"1")
1758 self.assertEqual(f.read(1), b'b')
1759 f.write(b'2')
1760 self.assertEqual(f.read1(1), b'd')
1761 f.write(b'3')
1762 buf = bytearray(1)
1763 f.readinto(buf)
1764 self.assertEqual(buf, b'f')
1765 f.write(b'4')
1766 self.assertEqual(f.peek(1), b'h')
1767 f.flush()
1768 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1769
1770 with self.BytesIO(b'abc') as raw:
1771 with self.tp(raw, 100) as f:
1772 self.assertEqual(f.read(1), b'a')
1773 f.write(b"2")
1774 self.assertEqual(f.read(1), b'c')
1775 f.flush()
1776 self.assertEqual(raw.getvalue(), b'a2c')
1777
1778 def test_interleaved_readline_write(self):
1779 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1780 with self.tp(raw) as f:
1781 f.write(b'1')
1782 self.assertEqual(f.readline(), b'b\n')
1783 f.write(b'2')
1784 self.assertEqual(f.readline(), b'def\n')
1785 f.write(b'3')
1786 self.assertEqual(f.readline(), b'\n')
1787 f.flush()
1788 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1789
R David Murray5b2cf5e2013-02-23 22:11:21 -05001790
Antoine Pitroubff5df02012-07-29 19:02:46 +02001791class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
1792 BufferedRandomTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001793 tp = io.BufferedRandom
1794
1795 def test_constructor(self):
1796 BufferedRandomTest.test_constructor(self)
1797 # The allocation can succeed on 32-bit builds, e.g. with more
1798 # than 2GB RAM and a 64-bit kernel.
1799 if sys.maxsize > 0x7FFFFFFF:
1800 rawio = self.MockRawIO()
1801 bufio = self.tp(rawio)
1802 self.assertRaises((OverflowError, MemoryError, ValueError),
1803 bufio.__init__, rawio, sys.maxsize)
1804
1805 def test_garbage_collection(self):
1806 CBufferedReaderTest.test_garbage_collection(self)
1807 CBufferedWriterTest.test_garbage_collection(self)
1808
R David Murray5b2cf5e2013-02-23 22:11:21 -05001809 def test_args_error(self):
1810 # Issue #17275
1811 with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
1812 self.tp(io.BytesIO(), 1024, 1024, 1024)
1813
1814
Antoine Pitrou19690592009-06-12 20:14:08 +00001815class PyBufferedRandomTest(BufferedRandomTest):
1816 tp = pyio.BufferedRandom
1817
1818
Christian Heimes1a6387e2008-03-26 12:49:49 +00001819# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1820# properties:
1821# - A single output character can correspond to many bytes of input.
1822# - The number of input bytes to complete the character can be
1823# undetermined until the last input byte is received.
1824# - The number of input bytes can vary depending on previous input.
1825# - A single input byte can correspond to many characters of output.
1826# - The number of output characters can be undetermined until the
1827# last input byte is received.
1828# - The number of output characters can vary depending on previous input.
1829
1830class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1831 """
1832 For testing seek/tell behavior with a stateful, buffering decoder.
1833
1834 Input is a sequence of words. Words may be fixed-length (length set
1835 by input) or variable-length (period-terminated). In variable-length
1836 mode, extra periods are ignored. Possible words are:
1837 - 'i' followed by a number sets the input length, I (maximum 99).
1838 When I is set to 0, words are space-terminated.
1839 - 'o' followed by a number sets the output length, O (maximum 99).
1840 - Any other word is converted into a word followed by a period on
1841 the output. The output word consists of the input word truncated
1842 or padded out with hyphens to make its length equal to O. If O
1843 is 0, the word is output verbatim without truncating or padding.
1844 I and O are initially set to 1. When I changes, any buffered input is
1845 re-scanned according to the new I. EOF also terminates the last word.
1846 """
1847
1848 def __init__(self, errors='strict'):
1849 codecs.IncrementalDecoder.__init__(self, errors)
1850 self.reset()
1851
1852 def __repr__(self):
1853 return '<SID %x>' % id(self)
1854
1855 def reset(self):
1856 self.i = 1
1857 self.o = 1
1858 self.buffer = bytearray()
1859
1860 def getstate(self):
1861 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1862 return bytes(self.buffer), i*100 + o
1863
1864 def setstate(self, state):
1865 buffer, io = state
1866 self.buffer = bytearray(buffer)
1867 i, o = divmod(io, 100)
1868 self.i, self.o = i ^ 1, o ^ 1
1869
1870 def decode(self, input, final=False):
1871 output = ''
1872 for b in input:
1873 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001874 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001875 if self.buffer:
1876 output += self.process_word()
1877 else:
1878 self.buffer.append(b)
1879 else: # fixed-length, terminate after self.i bytes
1880 self.buffer.append(b)
1881 if len(self.buffer) == self.i:
1882 output += self.process_word()
1883 if final and self.buffer: # EOF terminates the last word
1884 output += self.process_word()
1885 return output
1886
1887 def process_word(self):
1888 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001889 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001890 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001891 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001892 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1893 else:
1894 output = self.buffer.decode('ascii')
1895 if len(output) < self.o:
1896 output += '-'*self.o # pad out with hyphens
1897 if self.o:
1898 output = output[:self.o] # truncate to output length
1899 output += '.'
1900 self.buffer = bytearray()
1901 return output
1902
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001903 codecEnabled = False
1904
1905 @classmethod
1906 def lookupTestDecoder(cls, name):
1907 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001908 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001909 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001910 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001911 incrementalencoder=None,
1912 streamreader=None, streamwriter=None,
1913 incrementaldecoder=cls)
1914
1915# Register the previous decoder for testing.
1916# Disabled by default, tests will enable it.
1917codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1918
1919
Christian Heimes1a6387e2008-03-26 12:49:49 +00001920class StatefulIncrementalDecoderTest(unittest.TestCase):
1921 """
1922 Make sure the StatefulIncrementalDecoder actually works.
1923 """
1924
1925 test_cases = [
1926 # I=1, O=1 (fixed-length input == fixed-length output)
1927 (b'abcd', False, 'a.b.c.d.'),
1928 # I=0, O=0 (variable-length input, variable-length output)
1929 (b'oiabcd', True, 'abcd.'),
1930 # I=0, O=0 (should ignore extra periods)
1931 (b'oi...abcd...', True, 'abcd.'),
1932 # I=0, O=6 (variable-length input, fixed-length output)
1933 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1934 # I=2, O=6 (fixed-length input < fixed-length output)
1935 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1936 # I=6, O=3 (fixed-length input > fixed-length output)
1937 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1938 # I=0, then 3; O=29, then 15 (with longer output)
1939 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1940 'a----------------------------.' +
1941 'b----------------------------.' +
1942 'cde--------------------------.' +
1943 'abcdefghijabcde.' +
1944 'a.b------------.' +
1945 '.c.------------.' +
1946 'd.e------------.' +
1947 'k--------------.' +
1948 'l--------------.' +
1949 'm--------------.')
1950 ]
1951
Antoine Pitrou19690592009-06-12 20:14:08 +00001952 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001953 # Try a few one-shot test cases.
1954 for input, eof, output in self.test_cases:
1955 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001956 self.assertEqual(d.decode(input, eof), output)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001957
1958 # Also test an unfinished decode, followed by forcing EOF.
1959 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001960 self.assertEqual(d.decode(b'oiabcd'), '')
1961 self.assertEqual(d.decode(b'', 1), 'abcd.')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001962
1963class TextIOWrapperTest(unittest.TestCase):
1964
1965 def setUp(self):
1966 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1967 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001968 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001969
1970 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001971 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001972
Antoine Pitrou19690592009-06-12 20:14:08 +00001973 def test_constructor(self):
1974 r = self.BytesIO(b"\xc3\xa9\n\n")
1975 b = self.BufferedReader(r, 1000)
1976 t = self.TextIOWrapper(b)
1977 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti2623a372010-11-21 13:34:58 +00001978 self.assertEqual(t.encoding, "latin1")
1979 self.assertEqual(t.line_buffering, False)
Antoine Pitrou19690592009-06-12 20:14:08 +00001980 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti2623a372010-11-21 13:34:58 +00001981 self.assertEqual(t.encoding, "utf8")
1982 self.assertEqual(t.line_buffering, True)
1983 self.assertEqual("\xe9\n", t.readline())
Antoine Pitrou19690592009-06-12 20:14:08 +00001984 self.assertRaises(TypeError, t.__init__, b, newline=42)
1985 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1986
Serhiy Storchakaabb7e652015-04-16 11:56:35 +03001987 def test_uninitialized(self):
1988 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
1989 del t
1990 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
1991 self.assertRaises(Exception, repr, t)
1992 self.assertRaisesRegexp((ValueError, AttributeError),
1993 'uninitialized|has no attribute',
1994 t.read, 0)
1995 t.__init__(self.MockRawIO())
1996 self.assertEqual(t.read(0), u'')
1997
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03001998 def test_non_text_encoding_codecs_are_rejected(self):
1999 # Ensure the constructor complains if passed a codec that isn't
2000 # marked as a text encoding
2001 # http://bugs.python.org/issue20404
2002 r = self.BytesIO()
2003 b = self.BufferedWriter(r)
2004 with support.check_py3k_warnings():
2005 self.TextIOWrapper(b, encoding="hex_codec")
2006
Antoine Pitrou19690592009-06-12 20:14:08 +00002007 def test_detach(self):
2008 r = self.BytesIO()
2009 b = self.BufferedWriter(r)
2010 t = self.TextIOWrapper(b)
2011 self.assertIs(t.detach(), b)
2012
2013 t = self.TextIOWrapper(b, encoding="ascii")
2014 t.write("howdy")
2015 self.assertFalse(r.getvalue())
2016 t.detach()
2017 self.assertEqual(r.getvalue(), b"howdy")
2018 self.assertRaises(ValueError, t.detach)
2019
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002020 # Operations independent of the detached stream should still work
2021 repr(t)
2022 self.assertEqual(t.encoding, "ascii")
2023 self.assertEqual(t.errors, "strict")
2024 self.assertFalse(t.line_buffering)
2025
Antoine Pitrou19690592009-06-12 20:14:08 +00002026 def test_repr(self):
2027 raw = self.BytesIO("hello".encode("utf-8"))
2028 b = self.BufferedReader(raw)
2029 t = self.TextIOWrapper(b, encoding="utf-8")
2030 modname = self.TextIOWrapper.__module__
2031 self.assertEqual(repr(t),
2032 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2033 raw.name = "dummy"
2034 self.assertEqual(repr(t),
2035 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
2036 raw.name = b"dummy"
2037 self.assertEqual(repr(t),
2038 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
2039
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002040 t.buffer.detach()
2041 repr(t) # Should not raise an exception
2042
Antoine Pitrou19690592009-06-12 20:14:08 +00002043 def test_line_buffering(self):
2044 r = self.BytesIO()
2045 b = self.BufferedWriter(r, 1000)
2046 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
2047 t.write("X")
Ezio Melotti2623a372010-11-21 13:34:58 +00002048 self.assertEqual(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00002049 t.write("Y\nZ")
Ezio Melotti2623a372010-11-21 13:34:58 +00002050 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00002051 t.write("A\rB")
Ezio Melotti2623a372010-11-21 13:34:58 +00002052 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002053
Antoine Pitrou19690592009-06-12 20:14:08 +00002054 def test_encoding(self):
2055 # Check the encoding attribute is always set, and valid
2056 b = self.BytesIO()
2057 t = self.TextIOWrapper(b, encoding="utf8")
2058 self.assertEqual(t.encoding, "utf8")
2059 t = self.TextIOWrapper(b)
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03002060 self.assertIsNotNone(t.encoding)
Antoine Pitrou19690592009-06-12 20:14:08 +00002061 codecs.lookup(t.encoding)
2062
2063 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002064 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00002065 b = self.BytesIO(b"abc\n\xff\n")
2066 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002067 self.assertRaises(UnicodeError, t.read)
2068 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00002069 b = self.BytesIO(b"abc\n\xff\n")
2070 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002071 self.assertRaises(UnicodeError, t.read)
2072 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00002073 b = self.BytesIO(b"abc\n\xff\n")
2074 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti2623a372010-11-21 13:34:58 +00002075 self.assertEqual(t.read(), "abc\n\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002076 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00002077 b = self.BytesIO(b"abc\n\xff\n")
2078 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti2623a372010-11-21 13:34:58 +00002079 self.assertEqual(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002080
Antoine Pitrou19690592009-06-12 20:14:08 +00002081 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002082 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00002083 b = self.BytesIO()
2084 t = self.TextIOWrapper(b, encoding="ascii")
2085 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002086 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00002087 b = self.BytesIO()
2088 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2089 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002090 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00002091 b = self.BytesIO()
2092 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00002093 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00002094 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002095 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002096 self.assertEqual(b.getvalue(), b"abcdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002097 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00002098 b = self.BytesIO()
2099 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00002100 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00002101 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002102 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002103 self.assertEqual(b.getvalue(), b"abc?def\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002104
Antoine Pitrou19690592009-06-12 20:14:08 +00002105 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002106 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2107
2108 tests = [
2109 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
2110 [ '', input_lines ],
2111 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2112 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2113 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
2114 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002115 encodings = (
2116 'utf-8', 'latin-1',
2117 'utf-16', 'utf-16-le', 'utf-16-be',
2118 'utf-32', 'utf-32-le', 'utf-32-be',
2119 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00002120
2121 # Try a range of buffer sizes to test the case where \r is the last
2122 # character in TextIOWrapper._pending_line.
2123 for encoding in encodings:
2124 # XXX: str.encode() should return bytes
2125 data = bytes(''.join(input_lines).encode(encoding))
2126 for do_reads in (False, True):
2127 for bufsize in range(1, 10):
2128 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002129 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2130 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00002131 encoding=encoding)
2132 if do_reads:
2133 got_lines = []
2134 while True:
2135 c2 = textio.read(2)
2136 if c2 == '':
2137 break
Ezio Melotti2623a372010-11-21 13:34:58 +00002138 self.assertEqual(len(c2), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002139 got_lines.append(c2 + textio.readline())
2140 else:
2141 got_lines = list(textio)
2142
2143 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti2623a372010-11-21 13:34:58 +00002144 self.assertEqual(got_line, exp_line)
2145 self.assertEqual(len(got_lines), len(exp_lines))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002146
Antoine Pitrou19690592009-06-12 20:14:08 +00002147 def test_newlines_input(self):
2148 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002149 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2150 for newline, expected in [
2151 (None, normalized.decode("ascii").splitlines(True)),
2152 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00002153 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2154 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2155 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00002156 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00002157 buf = self.BytesIO(testdata)
2158 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti2623a372010-11-21 13:34:58 +00002159 self.assertEqual(txt.readlines(), expected)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002160 txt.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002161 self.assertEqual(txt.read(), "".join(expected))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002162
Antoine Pitrou19690592009-06-12 20:14:08 +00002163 def test_newlines_output(self):
2164 testdict = {
2165 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2166 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2167 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2168 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2169 }
2170 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2171 for newline, expected in tests:
2172 buf = self.BytesIO()
2173 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2174 txt.write("AAA\nB")
2175 txt.write("BB\nCCC\n")
2176 txt.write("X\rY\r\nZ")
2177 txt.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002178 self.assertEqual(buf.closed, False)
2179 self.assertEqual(buf.getvalue(), expected)
Antoine Pitrou19690592009-06-12 20:14:08 +00002180
2181 def test_destructor(self):
2182 l = []
2183 base = self.BytesIO
2184 class MyBytesIO(base):
2185 def close(self):
2186 l.append(self.getvalue())
2187 base.close(self)
2188 b = MyBytesIO()
2189 t = self.TextIOWrapper(b, encoding="ascii")
2190 t.write("abc")
2191 del t
2192 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00002193 self.assertEqual([b"abc"], l)
Antoine Pitrou19690592009-06-12 20:14:08 +00002194
2195 def test_override_destructor(self):
2196 record = []
2197 class MyTextIO(self.TextIOWrapper):
2198 def __del__(self):
2199 record.append(1)
2200 try:
2201 f = super(MyTextIO, self).__del__
2202 except AttributeError:
2203 pass
2204 else:
2205 f()
2206 def close(self):
2207 record.append(2)
2208 super(MyTextIO, self).close()
2209 def flush(self):
2210 record.append(3)
2211 super(MyTextIO, self).flush()
2212 b = self.BytesIO()
2213 t = MyTextIO(b, encoding="ascii")
2214 del t
2215 support.gc_collect()
2216 self.assertEqual(record, [1, 2, 3])
2217
2218 def test_error_through_destructor(self):
2219 # Test that the exception state is not modified by a destructor,
2220 # even if close() fails.
2221 rawio = self.CloseFailureIO()
2222 def f():
2223 self.TextIOWrapper(rawio).xyzzy
2224 with support.captured_output("stderr") as s:
2225 self.assertRaises(AttributeError, f)
2226 s = s.getvalue().strip()
2227 if s:
2228 # The destructor *may* have printed an unraisable error, check it
2229 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002230 self.assertTrue(s.startswith("Exception IOError: "), s)
2231 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002232
2233 # Systematic tests of the text I/O API
2234
Antoine Pitrou19690592009-06-12 20:14:08 +00002235 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002236 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2237 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00002238 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002239 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002240 self.assertEqual(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002241 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002242 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002243 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002244 self.assertEqual(f.tell(), 0)
2245 self.assertEqual(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002246 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002247 self.assertEqual(f.seek(0), 0)
2248 self.assertEqual(f.read(None), "abc")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002249 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002250 self.assertEqual(f.read(2), "ab")
2251 self.assertEqual(f.read(1), "c")
2252 self.assertEqual(f.read(1), "")
2253 self.assertEqual(f.read(), "")
2254 self.assertEqual(f.tell(), cookie)
2255 self.assertEqual(f.seek(0), 0)
2256 self.assertEqual(f.seek(0, 2), cookie)
2257 self.assertEqual(f.write("def"), 3)
2258 self.assertEqual(f.seek(cookie), cookie)
2259 self.assertEqual(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002260 if enc.startswith("utf"):
2261 self.multi_line_test(f, enc)
2262 f.close()
2263
2264 def multi_line_test(self, f, enc):
2265 f.seek(0)
2266 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00002267 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002268 wlines = []
2269 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2270 chars = []
2271 for i in range(size):
2272 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00002273 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002274 wlines.append((f.tell(), line))
2275 f.write(line)
2276 f.seek(0)
2277 rlines = []
2278 while True:
2279 pos = f.tell()
2280 line = f.readline()
2281 if not line:
2282 break
2283 rlines.append((pos, line))
Ezio Melotti2623a372010-11-21 13:34:58 +00002284 self.assertEqual(rlines, wlines)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002285
Antoine Pitrou19690592009-06-12 20:14:08 +00002286 def test_telling(self):
2287 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002288 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002289 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002290 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002291 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002292 p2 = f.tell()
2293 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002294 self.assertEqual(f.tell(), p0)
2295 self.assertEqual(f.readline(), "\xff\n")
2296 self.assertEqual(f.tell(), p1)
2297 self.assertEqual(f.readline(), "\xff\n")
2298 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002299 f.seek(0)
2300 for line in f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002301 self.assertEqual(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002302 self.assertRaises(IOError, f.tell)
Ezio Melotti2623a372010-11-21 13:34:58 +00002303 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002304 f.close()
2305
Antoine Pitrou19690592009-06-12 20:14:08 +00002306 def test_seeking(self):
2307 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00002308 prefix_size = chunk_size - 2
2309 u_prefix = "a" * prefix_size
2310 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti2623a372010-11-21 13:34:58 +00002311 self.assertEqual(len(u_prefix), len(prefix))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002312 u_suffix = "\u8888\n"
2313 suffix = bytes(u_suffix.encode("utf-8"))
2314 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00002315 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002316 f.write(line*2)
2317 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002318 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002319 s = f.read(prefix_size)
Ezio Melotti2623a372010-11-21 13:34:58 +00002320 self.assertEqual(s, prefix.decode("ascii"))
2321 self.assertEqual(f.tell(), prefix_size)
2322 self.assertEqual(f.readline(), u_suffix)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002323
Antoine Pitrou19690592009-06-12 20:14:08 +00002324 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002325 # Regression test for a specific bug
2326 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00002327 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002328 f.write(data)
2329 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002330 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002331 f._CHUNK_SIZE # Just test that it exists
2332 f._CHUNK_SIZE = 2
2333 f.readline()
2334 f.tell()
2335
Antoine Pitrou19690592009-06-12 20:14:08 +00002336 def test_seek_and_tell(self):
2337 #Test seek/tell using the StatefulIncrementalDecoder.
2338 # Make test faster by doing smaller seeks
2339 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00002340
Antoine Pitrou19690592009-06-12 20:14:08 +00002341 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002342 """Tell/seek to various points within a data stream and ensure
2343 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00002344 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002345 f.write(data)
2346 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002347 f = self.open(support.TESTFN, encoding='test_decoder')
2348 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00002349 decoded = f.read()
2350 f.close()
2351
2352 for i in range(min_pos, len(decoded) + 1): # seek positions
2353 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00002354 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti2623a372010-11-21 13:34:58 +00002355 self.assertEqual(f.read(i), decoded[:i])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002356 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002357 self.assertEqual(f.read(j), decoded[i:i + j])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002358 f.seek(cookie)
Ezio Melotti2623a372010-11-21 13:34:58 +00002359 self.assertEqual(f.read(), decoded[i:])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002360 f.close()
2361
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002362 # Enable the test decoder.
2363 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002364
2365 # Run the tests.
2366 try:
2367 # Try each test case.
2368 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002369 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002370
2371 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002372 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2373 offset = CHUNK_SIZE - len(input)//2
2374 prefix = b'.'*offset
2375 # Don't bother seeking into the prefix (takes too long).
2376 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002377 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002378
2379 # Ensure our test decoder won't interfere with subsequent tests.
2380 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002381 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002382
Antoine Pitrou19690592009-06-12 20:14:08 +00002383 def test_encoded_writes(self):
2384 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002385 tests = ("utf-16",
2386 "utf-16-le",
2387 "utf-16-be",
2388 "utf-32",
2389 "utf-32-le",
2390 "utf-32-be")
2391 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002392 buf = self.BytesIO()
2393 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002394 # Check if the BOM is written only once (see issue1753).
2395 f.write(data)
2396 f.write(data)
2397 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002398 self.assertEqual(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002399 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002400 self.assertEqual(f.read(), data * 2)
2401 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002402
Antoine Pitrou19690592009-06-12 20:14:08 +00002403 def test_unreadable(self):
2404 class UnReadable(self.BytesIO):
2405 def readable(self):
2406 return False
2407 txt = self.TextIOWrapper(UnReadable())
2408 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002409
Antoine Pitrou19690592009-06-12 20:14:08 +00002410 def test_read_one_by_one(self):
2411 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002412 reads = ""
2413 while True:
2414 c = txt.read(1)
2415 if not c:
2416 break
2417 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002418 self.assertEqual(reads, "AA\nBB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002419
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002420 def test_readlines(self):
2421 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2422 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2423 txt.seek(0)
2424 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2425 txt.seek(0)
2426 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2427
Christian Heimes1a6387e2008-03-26 12:49:49 +00002428 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002429 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002430 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002431 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002432 reads = ""
2433 while True:
2434 c = txt.read(128)
2435 if not c:
2436 break
2437 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002438 self.assertEqual(reads, "A"*127+"\nB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002439
Antoine Pitroueadca1d2012-10-16 23:02:27 +02002440 def test_writelines(self):
2441 l = ['ab', 'cd', 'ef']
2442 buf = self.BytesIO()
2443 txt = self.TextIOWrapper(buf)
2444 txt.writelines(l)
2445 txt.flush()
2446 self.assertEqual(buf.getvalue(), b'abcdef')
2447
2448 def test_writelines_userlist(self):
2449 l = UserList(['ab', 'cd', 'ef'])
2450 buf = self.BytesIO()
2451 txt = self.TextIOWrapper(buf)
2452 txt.writelines(l)
2453 txt.flush()
2454 self.assertEqual(buf.getvalue(), b'abcdef')
2455
2456 def test_writelines_error(self):
2457 txt = self.TextIOWrapper(self.BytesIO())
2458 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2459 self.assertRaises(TypeError, txt.writelines, None)
2460 self.assertRaises(TypeError, txt.writelines, b'abc')
2461
Christian Heimes1a6387e2008-03-26 12:49:49 +00002462 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002463 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002464
2465 # read one char at a time
2466 reads = ""
2467 while True:
2468 c = txt.read(1)
2469 if not c:
2470 break
2471 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002472 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002473
2474 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002475 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002476 txt._CHUNK_SIZE = 4
2477
2478 reads = ""
2479 while True:
2480 c = txt.read(4)
2481 if not c:
2482 break
2483 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002484 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002485
2486 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002487 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002488 txt._CHUNK_SIZE = 4
2489
2490 reads = txt.read(4)
2491 reads += txt.read(4)
2492 reads += txt.readline()
2493 reads += txt.readline()
2494 reads += txt.readline()
Ezio Melotti2623a372010-11-21 13:34:58 +00002495 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002496
2497 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002498 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002499 txt._CHUNK_SIZE = 4
2500
2501 reads = txt.read(4)
2502 reads += txt.read()
Ezio Melotti2623a372010-11-21 13:34:58 +00002503 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002504
2505 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002506 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002507 txt._CHUNK_SIZE = 4
2508
2509 reads = txt.read(4)
2510 pos = txt.tell()
2511 txt.seek(0)
2512 txt.seek(pos)
Ezio Melotti2623a372010-11-21 13:34:58 +00002513 self.assertEqual(txt.read(4), "BBB\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002514
2515 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002516 buffer = self.BytesIO(self.testdata)
2517 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002518
2519 self.assertEqual(buffer.seekable(), txt.seekable())
2520
Antoine Pitrou19690592009-06-12 20:14:08 +00002521 def test_append_bom(self):
2522 # The BOM is not written again when appending to a non-empty file
2523 filename = support.TESTFN
2524 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2525 with self.open(filename, 'w', encoding=charset) as f:
2526 f.write('aaa')
2527 pos = f.tell()
2528 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002529 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002530
2531 with self.open(filename, 'a', encoding=charset) as f:
2532 f.write('xxx')
2533 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002534 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002535
Antoine Pitrou19690592009-06-12 20:14:08 +00002536 def test_seek_bom(self):
2537 # Same test, but when seeking manually
2538 filename = support.TESTFN
2539 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2540 with self.open(filename, 'w', encoding=charset) as f:
2541 f.write('aaa')
2542 pos = f.tell()
2543 with self.open(filename, 'r+', encoding=charset) as f:
2544 f.seek(pos)
2545 f.write('zzz')
2546 f.seek(0)
2547 f.write('bbb')
2548 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002549 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002550
2551 def test_errors_property(self):
2552 with self.open(support.TESTFN, "w") as f:
2553 self.assertEqual(f.errors, "strict")
2554 with self.open(support.TESTFN, "w", errors="replace") as f:
2555 self.assertEqual(f.errors, "replace")
2556
Victor Stinner6a102812010-04-27 23:55:59 +00002557 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002558 def test_threads_write(self):
2559 # Issue6750: concurrent writes could duplicate data
2560 event = threading.Event()
2561 with self.open(support.TESTFN, "w", buffering=1) as f:
2562 def run(n):
2563 text = "Thread%03d\n" % n
2564 event.wait()
2565 f.write(text)
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03002566 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002567 for x in range(20)]
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03002568 with support.start_threads(threads, event.set):
2569 time.sleep(0.02)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002570 with self.open(support.TESTFN) as f:
2571 content = f.read()
2572 for n in range(20):
Ezio Melotti2623a372010-11-21 13:34:58 +00002573 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002574
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002575 def test_flush_error_on_close(self):
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002576 # Test that text file is closed despite failed flush
2577 # and that flush() is called before file closed.
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002578 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002579 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002580 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002581 closed[:] = [txt.closed, txt.buffer.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002582 raise IOError()
2583 txt.flush = bad_flush
2584 self.assertRaises(IOError, txt.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -06002585 self.assertTrue(txt.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002586 self.assertTrue(txt.buffer.closed)
2587 self.assertTrue(closed) # flush() called
2588 self.assertFalse(closed[0]) # flush() called before file closed
2589 self.assertFalse(closed[1])
Serhiy Storchaka437d5352015-02-23 00:28:38 +02002590 txt.flush = lambda: None # break reference loop
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002591
2592 def test_multi_close(self):
2593 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2594 txt.close()
2595 txt.close()
2596 txt.close()
2597 self.assertRaises(ValueError, txt.flush)
2598
Antoine Pitroufc9ead62010-12-21 21:26:55 +00002599 def test_readonly_attributes(self):
2600 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2601 buf = self.BytesIO(self.testdata)
2602 with self.assertRaises((AttributeError, TypeError)):
2603 txt.buffer = buf
2604
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002605 def test_read_nonbytes(self):
2606 # Issue #17106
2607 # Crash when underlying read() returns non-bytes
2608 class NonbytesStream(self.StringIO):
2609 read1 = self.StringIO.read
2610 class NonbytesStream(self.StringIO):
2611 read1 = self.StringIO.read
2612 t = self.TextIOWrapper(NonbytesStream('a'))
2613 with self.maybeRaises(TypeError):
2614 t.read(1)
2615 t = self.TextIOWrapper(NonbytesStream('a'))
2616 with self.maybeRaises(TypeError):
2617 t.readline()
2618 t = self.TextIOWrapper(NonbytesStream('a'))
2619 self.assertEqual(t.read(), u'a')
2620
2621 def test_illegal_decoder(self):
2622 # Issue #17106
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002623 # Bypass the early encoding check added in issue 20404
2624 def _make_illegal_wrapper():
2625 quopri = codecs.lookup("quopri_codec")
2626 quopri._is_text_encoding = True
2627 try:
2628 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
2629 newline='\n', encoding="quopri_codec")
2630 finally:
2631 quopri._is_text_encoding = False
2632 return t
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002633 # Crash when decoder returns non-string
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002634 with support.check_py3k_warnings():
2635 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2636 encoding='quopri_codec')
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002637 with self.maybeRaises(TypeError):
2638 t.read(1)
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002639 with support.check_py3k_warnings():
2640 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2641 encoding='quopri_codec')
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002642 with self.maybeRaises(TypeError):
2643 t.readline()
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002644 with support.check_py3k_warnings():
2645 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2646 encoding='quopri_codec')
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002647 with self.maybeRaises(TypeError):
2648 t.read()
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002649 #else:
2650 #t = _make_illegal_wrapper()
2651 #self.assertRaises(TypeError, t.read, 1)
2652 #t = _make_illegal_wrapper()
2653 #self.assertRaises(TypeError, t.readline)
2654 #t = _make_illegal_wrapper()
2655 #self.assertRaises(TypeError, t.read)
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002656
2657
Antoine Pitrou19690592009-06-12 20:14:08 +00002658class CTextIOWrapperTest(TextIOWrapperTest):
2659
2660 def test_initialization(self):
2661 r = self.BytesIO(b"\xc3\xa9\n\n")
2662 b = self.BufferedReader(r, 1000)
2663 t = self.TextIOWrapper(b)
2664 self.assertRaises(TypeError, t.__init__, b, newline=42)
2665 self.assertRaises(ValueError, t.read)
2666 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2667 self.assertRaises(ValueError, t.read)
2668
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002669 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2670 self.assertRaises(Exception, repr, t)
2671
Antoine Pitrou19690592009-06-12 20:14:08 +00002672 def test_garbage_collection(self):
2673 # C TextIOWrapper objects are collected, and collecting them flushes
2674 # all data to disk.
2675 # The Python version has __del__, so it ends in gc.garbage instead.
2676 rawio = io.FileIO(support.TESTFN, "wb")
2677 b = self.BufferedWriter(rawio)
2678 t = self.TextIOWrapper(b, encoding="ascii")
2679 t.write("456def")
2680 t.x = t
2681 wr = weakref.ref(t)
2682 del t
2683 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03002684 self.assertIsNone(wr(), wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002685 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002686 self.assertEqual(f.read(), b"456def")
2687
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002688 def test_rwpair_cleared_before_textio(self):
2689 # Issue 13070: TextIOWrapper's finalization would crash when called
2690 # after the reference to the underlying BufferedRWPair's writer got
2691 # cleared by the GC.
2692 for i in range(1000):
2693 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2694 t1 = self.TextIOWrapper(b1, encoding="ascii")
2695 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2696 t2 = self.TextIOWrapper(b2, encoding="ascii")
2697 # circular references
2698 t1.buddy = t2
2699 t2.buddy = t1
2700 support.gc_collect()
2701
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002702 maybeRaises = unittest.TestCase.assertRaises
2703
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002704
Antoine Pitrou19690592009-06-12 20:14:08 +00002705class PyTextIOWrapperTest(TextIOWrapperTest):
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002706 @contextlib.contextmanager
2707 def maybeRaises(self, *args, **kwds):
2708 yield
Antoine Pitrou19690592009-06-12 20:14:08 +00002709
2710
2711class IncrementalNewlineDecoderTest(unittest.TestCase):
2712
2713 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002714 # UTF-8 specific tests for a newline decoder
2715 def _check_decode(b, s, **kwargs):
2716 # We exercise getstate() / setstate() as well as decode()
2717 state = decoder.getstate()
Ezio Melotti2623a372010-11-21 13:34:58 +00002718 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002719 decoder.setstate(state)
Ezio Melotti2623a372010-11-21 13:34:58 +00002720 self.assertEqual(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002721
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002722 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002723
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002724 _check_decode(b'\xe8', "")
2725 _check_decode(b'\xa2', "")
2726 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002727
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002728 _check_decode(b'\xe8', "")
2729 _check_decode(b'\xa2', "")
2730 _check_decode(b'\x88', "\u8888")
2731
2732 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002733 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2734
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002735 decoder.reset()
2736 _check_decode(b'\n', "\n")
2737 _check_decode(b'\r', "")
2738 _check_decode(b'', "\n", final=True)
2739 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002740
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002741 _check_decode(b'\r', "")
2742 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002743
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002744 _check_decode(b'\r\r\n', "\n\n")
2745 _check_decode(b'\r', "")
2746 _check_decode(b'\r', "\n")
2747 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002748
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002749 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2750 _check_decode(b'\xe8\xa2\x88', "\u8888")
2751 _check_decode(b'\n', "\n")
2752 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2753 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002754
Antoine Pitrou19690592009-06-12 20:14:08 +00002755 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002756 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002757 if encoding is not None:
2758 encoder = codecs.getincrementalencoder(encoding)()
2759 def _decode_bytewise(s):
2760 # Decode one byte at a time
2761 for b in encoder.encode(s):
2762 result.append(decoder.decode(b))
2763 else:
2764 encoder = None
2765 def _decode_bytewise(s):
2766 # Decode one char at a time
2767 for c in s:
2768 result.append(decoder.decode(c))
Ezio Melotti2623a372010-11-21 13:34:58 +00002769 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002770 _decode_bytewise("abc\n\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002771 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002772 _decode_bytewise("\nabc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002773 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002774 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002775 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002776 _decode_bytewise("abc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002777 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002778 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002779 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002780 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002781 input = "abc"
2782 if encoder is not None:
2783 encoder.reset()
2784 input = encoder.encode(input)
Ezio Melotti2623a372010-11-21 13:34:58 +00002785 self.assertEqual(decoder.decode(input), "abc")
2786 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002787
2788 def test_newline_decoder(self):
2789 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002790 # None meaning the IncrementalNewlineDecoder takes unicode input
2791 # rather than bytes input
2792 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002793 'utf-16', 'utf-16-le', 'utf-16-be',
2794 'utf-32', 'utf-32-le', 'utf-32-be',
2795 )
2796 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002797 decoder = enc and codecs.getincrementaldecoder(enc)()
2798 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2799 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002800 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002801 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2802 self.check_newline_decoding_utf8(decoder)
2803
2804 def test_newline_bytes(self):
2805 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2806 def _check(dec):
Ezio Melotti2623a372010-11-21 13:34:58 +00002807 self.assertEqual(dec.newlines, None)
2808 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2809 self.assertEqual(dec.newlines, None)
2810 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2811 self.assertEqual(dec.newlines, None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002812 dec = self.IncrementalNewlineDecoder(None, translate=False)
2813 _check(dec)
2814 dec = self.IncrementalNewlineDecoder(None, translate=True)
2815 _check(dec)
2816
2817class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2818 pass
2819
2820class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2821 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002822
Christian Heimes1a6387e2008-03-26 12:49:49 +00002823
2824# XXX Tests for open()
2825
2826class MiscIOTest(unittest.TestCase):
2827
Benjamin Petersonad100c32008-11-20 22:06:22 +00002828 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002829 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002830
Antoine Pitrou19690592009-06-12 20:14:08 +00002831 def test___all__(self):
2832 for name in self.io.__all__:
2833 obj = getattr(self.io, name, None)
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03002834 self.assertIsNotNone(obj, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002835 if name == "open":
2836 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002837 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002838 self.assertTrue(issubclass(obj, Exception), name)
2839 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002840 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002841
Benjamin Petersonad100c32008-11-20 22:06:22 +00002842 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002843 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002844 self.assertEqual(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002845 f.close()
2846
Antoine Pitrou19690592009-06-12 20:14:08 +00002847 f = self.open(support.TESTFN, "U")
Ezio Melotti2623a372010-11-21 13:34:58 +00002848 self.assertEqual(f.name, support.TESTFN)
2849 self.assertEqual(f.buffer.name, support.TESTFN)
2850 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2851 self.assertEqual(f.mode, "U")
2852 self.assertEqual(f.buffer.mode, "rb")
2853 self.assertEqual(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002854 f.close()
2855
Antoine Pitrou19690592009-06-12 20:14:08 +00002856 f = self.open(support.TESTFN, "w+")
Ezio Melotti2623a372010-11-21 13:34:58 +00002857 self.assertEqual(f.mode, "w+")
2858 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2859 self.assertEqual(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002860
Antoine Pitrou19690592009-06-12 20:14:08 +00002861 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti2623a372010-11-21 13:34:58 +00002862 self.assertEqual(g.mode, "wb")
2863 self.assertEqual(g.raw.mode, "wb")
2864 self.assertEqual(g.name, f.fileno())
2865 self.assertEqual(g.raw.name, f.fileno())
Benjamin Petersonad100c32008-11-20 22:06:22 +00002866 f.close()
2867 g.close()
2868
Antoine Pitrou19690592009-06-12 20:14:08 +00002869 def test_io_after_close(self):
2870 for kwargs in [
2871 {"mode": "w"},
2872 {"mode": "wb"},
2873 {"mode": "w", "buffering": 1},
2874 {"mode": "w", "buffering": 2},
2875 {"mode": "wb", "buffering": 0},
2876 {"mode": "r"},
2877 {"mode": "rb"},
2878 {"mode": "r", "buffering": 1},
2879 {"mode": "r", "buffering": 2},
2880 {"mode": "rb", "buffering": 0},
2881 {"mode": "w+"},
2882 {"mode": "w+b"},
2883 {"mode": "w+", "buffering": 1},
2884 {"mode": "w+", "buffering": 2},
2885 {"mode": "w+b", "buffering": 0},
2886 ]:
2887 f = self.open(support.TESTFN, **kwargs)
2888 f.close()
2889 self.assertRaises(ValueError, f.flush)
2890 self.assertRaises(ValueError, f.fileno)
2891 self.assertRaises(ValueError, f.isatty)
2892 self.assertRaises(ValueError, f.__iter__)
2893 if hasattr(f, "peek"):
2894 self.assertRaises(ValueError, f.peek, 1)
2895 self.assertRaises(ValueError, f.read)
2896 if hasattr(f, "read1"):
2897 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinner5100a402011-05-25 22:15:36 +02002898 if hasattr(f, "readall"):
2899 self.assertRaises(ValueError, f.readall)
Antoine Pitrou19690592009-06-12 20:14:08 +00002900 if hasattr(f, "readinto"):
2901 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2902 self.assertRaises(ValueError, f.readline)
2903 self.assertRaises(ValueError, f.readlines)
2904 self.assertRaises(ValueError, f.seek, 0)
2905 self.assertRaises(ValueError, f.tell)
2906 self.assertRaises(ValueError, f.truncate)
2907 self.assertRaises(ValueError, f.write,
2908 b"" if "b" in kwargs['mode'] else "")
2909 self.assertRaises(ValueError, f.writelines, [])
2910 self.assertRaises(ValueError, next, f)
2911
2912 def test_blockingioerror(self):
2913 # Various BlockingIOError issues
2914 self.assertRaises(TypeError, self.BlockingIOError)
2915 self.assertRaises(TypeError, self.BlockingIOError, 1)
2916 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2917 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2918 b = self.BlockingIOError(1, "")
2919 self.assertEqual(b.characters_written, 0)
2920 class C(unicode):
2921 pass
2922 c = C("")
2923 b = self.BlockingIOError(1, c)
2924 c.b = b
2925 b.c = c
2926 wr = weakref.ref(c)
2927 del c, b
2928 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03002929 self.assertIsNone(wr(), wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002930
2931 def test_abcs(self):
2932 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002933 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2934 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2935 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2936 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002937
2938 def _check_abc_inheritance(self, abcmodule):
2939 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002940 self.assertIsInstance(f, abcmodule.IOBase)
2941 self.assertIsInstance(f, abcmodule.RawIOBase)
2942 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2943 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002944 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002945 self.assertIsInstance(f, abcmodule.IOBase)
2946 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2947 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2948 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002949 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002950 self.assertIsInstance(f, abcmodule.IOBase)
2951 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2952 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2953 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002954
2955 def test_abc_inheritance(self):
2956 # Test implementations inherit from their respective ABCs
2957 self._check_abc_inheritance(self)
2958
2959 def test_abc_inheritance_official(self):
2960 # Test implementations inherit from the official ABCs of the
2961 # baseline "io" module.
2962 self._check_abc_inheritance(io)
2963
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01002964 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2965 def test_nonblock_pipe_write_bigbuf(self):
2966 self._test_nonblock_pipe_write(16*1024)
2967
2968 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2969 def test_nonblock_pipe_write_smallbuf(self):
2970 self._test_nonblock_pipe_write(1024)
2971
2972 def _set_non_blocking(self, fd):
2973 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2974 self.assertNotEqual(flags, -1)
2975 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2976 self.assertEqual(res, 0)
2977
2978 def _test_nonblock_pipe_write(self, bufsize):
2979 sent = []
2980 received = []
2981 r, w = os.pipe()
2982 self._set_non_blocking(r)
2983 self._set_non_blocking(w)
2984
2985 # To exercise all code paths in the C implementation we need
2986 # to play with buffer sizes. For instance, if we choose a
2987 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2988 # then we will never get a partial write of the buffer.
2989 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2990 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2991
2992 with rf, wf:
2993 for N in 9999, 73, 7574:
2994 try:
2995 i = 0
2996 while True:
2997 msg = bytes([i % 26 + 97]) * N
2998 sent.append(msg)
2999 wf.write(msg)
3000 i += 1
3001
3002 except self.BlockingIOError as e:
3003 self.assertEqual(e.args[0], errno.EAGAIN)
3004 sent[-1] = sent[-1][:e.characters_written]
3005 received.append(rf.read())
3006 msg = b'BLOCKED'
3007 wf.write(msg)
3008 sent.append(msg)
3009
3010 while True:
3011 try:
3012 wf.flush()
3013 break
3014 except self.BlockingIOError as e:
3015 self.assertEqual(e.args[0], errno.EAGAIN)
3016 self.assertEqual(e.characters_written, 0)
3017 received.append(rf.read())
3018
3019 received += iter(rf.read, None)
3020
3021 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03003022 self.assertEqual(sent, received)
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01003023 self.assertTrue(wf.closed)
3024 self.assertTrue(rf.closed)
3025
Antoine Pitrou19690592009-06-12 20:14:08 +00003026class CMiscIOTest(MiscIOTest):
3027 io = io
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03003028 shutdown_error = "RuntimeError: could not find io module state"
Antoine Pitrou19690592009-06-12 20:14:08 +00003029
3030class PyMiscIOTest(MiscIOTest):
3031 io = pyio
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03003032 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Petersonad100c32008-11-20 22:06:22 +00003033
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003034
3035@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3036class SignalsTest(unittest.TestCase):
3037
3038 def setUp(self):
3039 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3040
3041 def tearDown(self):
3042 signal.signal(signal.SIGALRM, self.oldalrm)
3043
3044 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00003045 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003046
3047 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinner49d495f2011-07-04 11:44:46 +02003048 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
3049 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003050 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3051 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou6439c002011-02-25 21:35:47 +00003052 invokes the signal handler, and bubbles up the exception raised
3053 in the latter."""
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003054 read_results = []
3055 def _read():
3056 s = os.read(r, 1)
3057 read_results.append(s)
3058 t = threading.Thread(target=_read)
3059 t.daemon = True
3060 r, w = os.pipe()
3061 try:
3062 wio = self.io.open(w, **fdopen_kwargs)
3063 t.start()
3064 signal.alarm(1)
3065 # Fill the pipe enough that the write will be blocking.
3066 # It will be interrupted by the timer armed above. Since the
3067 # other thread has read one byte, the low-level write will
3068 # return with a successful (partial) result rather than an EINTR.
3069 # The buffered IO layer must check for pending signal
3070 # handlers, which in this case will invoke alarm_interrupt().
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03003071 try:
3072 with self.assertRaises(ZeroDivisionError):
3073 wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1))
3074 finally:
3075 t.join()
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003076 # We got one byte, get another one and check that it isn't a
3077 # repeat of the first one.
3078 read_results.append(os.read(r, 1))
3079 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3080 finally:
3081 os.close(w)
3082 os.close(r)
3083 # This is deliberate. If we didn't close the file descriptor
3084 # before closing wio, wio would try to flush its internal
3085 # buffer, and block again.
3086 try:
3087 wio.close()
3088 except IOError as e:
3089 if e.errno != errno.EBADF:
3090 raise
3091
3092 def test_interrupted_write_unbuffered(self):
3093 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3094
3095 def test_interrupted_write_buffered(self):
3096 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3097
3098 def test_interrupted_write_text(self):
3099 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3100
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003101 def check_reentrant_write(self, data, **fdopen_kwargs):
3102 def on_alarm(*args):
3103 # Will be called reentrantly from the same thread
3104 wio.write(data)
Victor Stinner4c41f842011-07-05 13:29:26 +02003105 1//0
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003106 signal.signal(signal.SIGALRM, on_alarm)
3107 r, w = os.pipe()
3108 wio = self.io.open(w, **fdopen_kwargs)
3109 try:
3110 signal.alarm(1)
3111 # Either the reentrant call to wio.write() fails with RuntimeError,
3112 # or the signal handler raises ZeroDivisionError.
3113 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3114 while 1:
3115 for i in range(100):
3116 wio.write(data)
3117 wio.flush()
3118 # Make sure the buffer doesn't fill up and block further writes
3119 os.read(r, len(data) * 100)
3120 exc = cm.exception
3121 if isinstance(exc, RuntimeError):
3122 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3123 finally:
3124 wio.close()
3125 os.close(r)
3126
3127 def test_reentrant_write_buffered(self):
3128 self.check_reentrant_write(b"xy", mode="wb")
3129
3130 def test_reentrant_write_text(self):
3131 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3132
Antoine Pitrou6439c002011-02-25 21:35:47 +00003133 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3134 """Check that a buffered read, when it gets interrupted (either
3135 returning a partial result or EINTR), properly invokes the signal
3136 handler and retries if the latter returned successfully."""
3137 r, w = os.pipe()
3138 fdopen_kwargs["closefd"] = False
3139 def alarm_handler(sig, frame):
3140 os.write(w, b"bar")
3141 signal.signal(signal.SIGALRM, alarm_handler)
3142 try:
3143 rio = self.io.open(r, **fdopen_kwargs)
3144 os.write(w, b"foo")
3145 signal.alarm(1)
3146 # Expected behaviour:
3147 # - first raw read() returns partial b"foo"
3148 # - second raw read() returns EINTR
3149 # - third raw read() returns b"bar"
3150 self.assertEqual(decode(rio.read(6)), "foobar")
3151 finally:
3152 rio.close()
3153 os.close(w)
3154 os.close(r)
3155
3156 def test_interrupterd_read_retry_buffered(self):
3157 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3158 mode="rb")
3159
3160 def test_interrupterd_read_retry_text(self):
3161 self.check_interrupted_read_retry(lambda x: x,
3162 mode="r")
3163
3164 @unittest.skipUnless(threading, 'Threading required for this test.')
3165 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3166 """Check that a buffered write, when it gets interrupted (either
3167 returning a partial result or EINTR), properly invokes the signal
3168 handler and retries if the latter returned successfully."""
3169 select = support.import_module("select")
3170 # A quantity that exceeds the buffer size of an anonymous pipe's
3171 # write end.
Antoine Pitrou68915d72013-04-24 23:31:38 +02003172 N = support.PIPE_MAX_SIZE
Antoine Pitrou6439c002011-02-25 21:35:47 +00003173 r, w = os.pipe()
3174 fdopen_kwargs["closefd"] = False
3175 # We need a separate thread to read from the pipe and allow the
3176 # write() to finish. This thread is started after the SIGALRM is
3177 # received (forcing a first EINTR in write()).
3178 read_results = []
3179 write_finished = False
Serhiy Storchaka53ea1622015-03-28 20:38:48 +02003180 error = [None]
Antoine Pitrou6439c002011-02-25 21:35:47 +00003181 def _read():
Serhiy Storchaka53ea1622015-03-28 20:38:48 +02003182 try:
3183 while not write_finished:
3184 while r in select.select([r], [], [], 1.0)[0]:
3185 s = os.read(r, 1024)
3186 read_results.append(s)
3187 except BaseException as exc:
3188 error[0] = exc
Antoine Pitrou6439c002011-02-25 21:35:47 +00003189 t = threading.Thread(target=_read)
3190 t.daemon = True
3191 def alarm1(sig, frame):
3192 signal.signal(signal.SIGALRM, alarm2)
3193 signal.alarm(1)
3194 def alarm2(sig, frame):
3195 t.start()
3196 signal.signal(signal.SIGALRM, alarm1)
3197 try:
3198 wio = self.io.open(w, **fdopen_kwargs)
3199 signal.alarm(1)
3200 # Expected behaviour:
3201 # - first raw write() is partial (because of the limited pipe buffer
3202 # and the first alarm)
3203 # - second raw write() returns EINTR (because of the second alarm)
3204 # - subsequent write()s are successful (either partial or complete)
3205 self.assertEqual(N, wio.write(item * N))
3206 wio.flush()
3207 write_finished = True
3208 t.join()
Serhiy Storchaka53ea1622015-03-28 20:38:48 +02003209
3210 self.assertIsNone(error[0])
Antoine Pitrou6439c002011-02-25 21:35:47 +00003211 self.assertEqual(N, sum(len(x) for x in read_results))
3212 finally:
3213 write_finished = True
3214 os.close(w)
3215 os.close(r)
3216 # This is deliberate. If we didn't close the file descriptor
3217 # before closing wio, wio would try to flush its internal
3218 # buffer, and could block (in case of failure).
3219 try:
3220 wio.close()
3221 except IOError as e:
3222 if e.errno != errno.EBADF:
3223 raise
3224
3225 def test_interrupterd_write_retry_buffered(self):
3226 self.check_interrupted_write_retry(b"x", mode="wb")
3227
3228 def test_interrupterd_write_retry_text(self):
3229 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3230
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003231
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003232class CSignalsTest(SignalsTest):
3233 io = io
3234
3235class PySignalsTest(SignalsTest):
3236 io = pyio
3237
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003238 # Handling reentrancy issues would slow down _pyio even more, so the
3239 # tests are disabled.
3240 test_reentrant_write_buffered = None
3241 test_reentrant_write_text = None
3242
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003243
Christian Heimes1a6387e2008-03-26 12:49:49 +00003244def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00003245 tests = (CIOTest, PyIOTest,
3246 CBufferedReaderTest, PyBufferedReaderTest,
3247 CBufferedWriterTest, PyBufferedWriterTest,
3248 CBufferedRWPairTest, PyBufferedRWPairTest,
3249 CBufferedRandomTest, PyBufferedRandomTest,
3250 StatefulIncrementalDecoderTest,
3251 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3252 CTextIOWrapperTest, PyTextIOWrapperTest,
3253 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003254 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00003255 )
3256
3257 # Put the namespaces of the IO module we are testing and some useful mock
3258 # classes in the __dict__ of each test.
3259 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00003260 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00003261 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3262 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
3263 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
3264 globs = globals()
3265 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3266 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3267 # Avoid turning open into a bound method.
3268 py_io_ns["open"] = pyio.OpenWrapper
3269 for test in tests:
3270 if test.__name__.startswith("C"):
3271 for name, obj in c_io_ns.items():
3272 setattr(test, name, obj)
3273 elif test.__name__.startswith("Py"):
3274 for name, obj in py_io_ns.items():
3275 setattr(test, name, obj)
3276
3277 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00003278
3279if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00003280 test_main()