blob: 34760c9533ec7c3899c9a3fbbbbdf7a39876e35a [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakac72e66a2015-11-02 15:06:09 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Antoine Pitrou19690592009-06-12 20:14:08 +000019# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Serhiy Storchaka05b0a1b2014-06-09 13:32:08 +030032import warnings
Antoine Pitrou19690592009-06-12 20:14:08 +000033import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000034import signal
35import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000036from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000037from collections import deque
Antoine Pitrou78e761e2012-10-16 22:57:11 +020038from UserList import UserList
Antoine Pitrou19690592009-06-12 20:14:08 +000039from test import test_support as support
Serhiy Storchaka354d50e2013-02-03 17:10:42 +020040import contextlib
Christian Heimes1a6387e2008-03-26 12:49:49 +000041
42import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000043import io # C implementation of io
44import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000045try:
46 import threading
47except ImportError:
48 threading = None
Antoine Pitrou5aa7df32011-11-21 20:16:44 +010049try:
50 import fcntl
51except ImportError:
52 fcntl = None
Antoine Pitrou19690592009-06-12 20:14:08 +000053
54__metaclass__ = type
55bytes = support.py3k_bytes
56
57def _default_chunk_size():
58 """Get the default TextIOWrapper chunk size"""
59 with io.open(__file__, "r", encoding="latin1") as f:
60 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000061
62
Antoine Pitrou6391b342010-09-14 18:48:19 +000063class MockRawIOWithoutRead:
64 """A RawIO implementation without read(), so as to exercise the default
65 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000066
67 def __init__(self, read_stack=()):
68 self._read_stack = list(read_stack)
69 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000070 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000071 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000072
Christian Heimes1a6387e2008-03-26 12:49:49 +000073 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000074 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000075 return len(b)
76
77 def writable(self):
78 return True
79
80 def fileno(self):
81 return 42
82
83 def readable(self):
84 return True
85
86 def seekable(self):
87 return True
88
89 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000090 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000091
92 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000093 return 0 # same comment as above
94
95 def readinto(self, buf):
96 self._reads += 1
97 max_len = len(buf)
98 try:
99 data = self._read_stack[0]
100 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000101 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +0000102 return 0
103 if data is None:
104 del self._read_stack[0]
105 return None
106 n = len(data)
107 if len(data) <= max_len:
108 del self._read_stack[0]
109 buf[:n] = data
110 return n
111 else:
112 buf[:] = data[:max_len]
113 self._read_stack[0] = data[max_len:]
114 return max_len
115
116 def truncate(self, pos=None):
117 return pos
118
Antoine Pitrou6391b342010-09-14 18:48:19 +0000119class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
120 pass
121
122class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
123 pass
124
125
126class MockRawIO(MockRawIOWithoutRead):
127
128 def read(self, n=None):
129 self._reads += 1
130 try:
131 return self._read_stack.pop(0)
132 except:
133 self._extraneous_reads += 1
134 return b""
135
Antoine Pitrou19690592009-06-12 20:14:08 +0000136class CMockRawIO(MockRawIO, io.RawIOBase):
137 pass
138
139class PyMockRawIO(MockRawIO, pyio.RawIOBase):
140 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000141
142
Antoine Pitrou19690592009-06-12 20:14:08 +0000143class MisbehavedRawIO(MockRawIO):
144 def write(self, b):
145 return MockRawIO.write(self, b) * 2
146
147 def read(self, n=None):
148 return MockRawIO.read(self, n) * 2
149
150 def seek(self, pos, whence):
151 return -123
152
153 def tell(self):
154 return -456
155
156 def readinto(self, buf):
157 MockRawIO.readinto(self, buf)
158 return len(buf) * 5
159
160class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
161 pass
162
163class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
164 pass
165
166
167class CloseFailureIO(MockRawIO):
168 closed = 0
169
170 def close(self):
171 if not self.closed:
172 self.closed = 1
173 raise IOError
174
175class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
176 pass
177
178class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
179 pass
180
181
182class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000183
184 def __init__(self, data):
185 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000186 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000187
188 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000189 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000190 self.read_history.append(None if res is None else len(res))
191 return res
192
Antoine Pitrou19690592009-06-12 20:14:08 +0000193 def readinto(self, b):
194 res = super(MockFileIO, self).readinto(b)
195 self.read_history.append(res)
196 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000197
Antoine Pitrou19690592009-06-12 20:14:08 +0000198class CMockFileIO(MockFileIO, io.BytesIO):
199 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000200
Antoine Pitrou19690592009-06-12 20:14:08 +0000201class PyMockFileIO(MockFileIO, pyio.BytesIO):
202 pass
203
204
205class MockNonBlockWriterIO:
206
207 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000208 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000209 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000210
Antoine Pitrou19690592009-06-12 20:14:08 +0000211 def pop_written(self):
212 s = b"".join(self._write_stack)
213 self._write_stack[:] = []
214 return s
215
216 def block_on(self, char):
217 """Block when a given char is encountered."""
218 self._blocker_char = char
219
220 def readable(self):
221 return True
222
223 def seekable(self):
224 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000225
226 def writable(self):
227 return True
228
Antoine Pitrou19690592009-06-12 20:14:08 +0000229 def write(self, b):
230 b = bytes(b)
231 n = -1
232 if self._blocker_char:
233 try:
234 n = b.index(self._blocker_char)
235 except ValueError:
236 pass
237 else:
Antoine Pitrou5aa7df32011-11-21 20:16:44 +0100238 if n > 0:
239 # write data up to the first blocker
240 self._write_stack.append(b[:n])
241 return n
242 else:
243 # cancel blocker and indicate would block
244 self._blocker_char = None
245 return None
Antoine Pitrou19690592009-06-12 20:14:08 +0000246 self._write_stack.append(b)
247 return len(b)
248
249class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
250 BlockingIOError = io.BlockingIOError
251
252class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
253 BlockingIOError = pyio.BlockingIOError
254
Christian Heimes1a6387e2008-03-26 12:49:49 +0000255
256class IOTest(unittest.TestCase):
257
Antoine Pitrou19690592009-06-12 20:14:08 +0000258 def setUp(self):
259 support.unlink(support.TESTFN)
260
Christian Heimes1a6387e2008-03-26 12:49:49 +0000261 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000262 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000263
264 def write_ops(self, f):
265 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000266 f.truncate(0)
267 self.assertEqual(f.tell(), 5)
268 f.seek(0)
269
270 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000271 self.assertEqual(f.seek(0), 0)
272 self.assertEqual(f.write(b"Hello."), 6)
273 self.assertEqual(f.tell(), 6)
274 self.assertEqual(f.seek(-1, 1), 5)
275 self.assertEqual(f.tell(), 5)
276 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
277 self.assertEqual(f.seek(0), 0)
278 self.assertEqual(f.write(b"h"), 1)
279 self.assertEqual(f.seek(-1, 2), 13)
280 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000281
Christian Heimes1a6387e2008-03-26 12:49:49 +0000282 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000283 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000284 self.assertRaises(TypeError, f.seek, 0.0)
285
286 def read_ops(self, f, buffered=False):
287 data = f.read(5)
288 self.assertEqual(data, b"hello")
289 data = bytearray(data)
290 self.assertEqual(f.readinto(data), 5)
291 self.assertEqual(data, b" worl")
292 self.assertEqual(f.readinto(data), 2)
293 self.assertEqual(len(data), 5)
294 self.assertEqual(data[:2], b"d\n")
295 self.assertEqual(f.seek(0), 0)
296 self.assertEqual(f.read(20), b"hello world\n")
297 self.assertEqual(f.read(1), b"")
298 self.assertEqual(f.readinto(bytearray(b"x")), 0)
299 self.assertEqual(f.seek(-6, 2), 6)
300 self.assertEqual(f.read(5), b"world")
301 self.assertEqual(f.read(0), b"")
302 self.assertEqual(f.readinto(bytearray()), 0)
303 self.assertEqual(f.seek(-6, 1), 5)
304 self.assertEqual(f.read(5), b" worl")
305 self.assertEqual(f.tell(), 10)
306 self.assertRaises(TypeError, f.seek, 0.0)
307 if buffered:
308 f.seek(0)
309 self.assertEqual(f.read(), b"hello world\n")
310 f.seek(6)
311 self.assertEqual(f.read(), b"world\n")
312 self.assertEqual(f.read(), b"")
313
314 LARGE = 2**31
315
316 def large_file_ops(self, f):
317 assert f.readable()
318 assert f.writable()
319 self.assertEqual(f.seek(self.LARGE), self.LARGE)
320 self.assertEqual(f.tell(), self.LARGE)
321 self.assertEqual(f.write(b"xxx"), 3)
322 self.assertEqual(f.tell(), self.LARGE + 3)
323 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
324 self.assertEqual(f.truncate(), self.LARGE + 2)
325 self.assertEqual(f.tell(), self.LARGE + 2)
326 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
327 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000328 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000329 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
330 self.assertEqual(f.seek(-1, 2), self.LARGE)
331 self.assertEqual(f.read(2), b"x")
332
Antoine Pitrou19690592009-06-12 20:14:08 +0000333 def test_invalid_operations(self):
334 # Try writing on a file opened in read mode and vice-versa.
335 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000336 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000337 self.assertRaises(IOError, fp.read)
338 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000339 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000340 self.assertRaises(IOError, fp.write, b"blah")
341 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000342 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000343 self.assertRaises(IOError, fp.write, "blah")
344 self.assertRaises(IOError, fp.writelines, ["blah\n"])
345
Christian Heimes1a6387e2008-03-26 12:49:49 +0000346 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000347 with self.open(support.TESTFN, "wb", buffering=0) as f:
348 self.assertEqual(f.readable(), False)
349 self.assertEqual(f.writable(), True)
350 self.assertEqual(f.seekable(), True)
351 self.write_ops(f)
352 with self.open(support.TESTFN, "rb", buffering=0) as f:
353 self.assertEqual(f.readable(), True)
354 self.assertEqual(f.writable(), False)
355 self.assertEqual(f.seekable(), True)
356 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000357
358 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000359 with self.open(support.TESTFN, "wb") as f:
360 self.assertEqual(f.readable(), False)
361 self.assertEqual(f.writable(), True)
362 self.assertEqual(f.seekable(), True)
363 self.write_ops(f)
364 with self.open(support.TESTFN, "rb") as f:
365 self.assertEqual(f.readable(), True)
366 self.assertEqual(f.writable(), False)
367 self.assertEqual(f.seekable(), True)
368 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000369
370 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000371 with self.open(support.TESTFN, "wb") as f:
372 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
373 with self.open(support.TESTFN, "rb") as f:
374 self.assertEqual(f.readline(), b"abc\n")
375 self.assertEqual(f.readline(10), b"def\n")
376 self.assertEqual(f.readline(2), b"xy")
377 self.assertEqual(f.readline(4), b"zzy\n")
378 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000379 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000380 self.assertRaises(TypeError, f.readline, 5.3)
381 with self.open(support.TESTFN, "r") as f:
382 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000383
384 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000385 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000386 self.write_ops(f)
387 data = f.getvalue()
388 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000389 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000390 self.read_ops(f, True)
391
392 def test_large_file_ops(self):
393 # On Windows and Mac OSX this test comsumes large resources; It takes
394 # a long time to build the >2GB file and takes >2GB of disk space
395 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000396 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware1f702212013-12-10 14:09:20 -0600397 support.requires(
398 'largefile',
399 'test requires %s bytes and a long time to run' % self.LARGE)
Antoine Pitrou19690592009-06-12 20:14:08 +0000400 with self.open(support.TESTFN, "w+b", 0) as f:
401 self.large_file_ops(f)
402 with self.open(support.TESTFN, "w+b") as f:
403 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000404
405 def test_with_open(self):
406 for bufsize in (0, 1, 100):
407 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000408 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000409 f.write(b"xxx")
410 self.assertEqual(f.closed, True)
411 f = None
412 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000413 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000414 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000415 except ZeroDivisionError:
416 self.assertEqual(f.closed, True)
417 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000418 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000419
Antoine Pitroue741cc62009-01-21 00:45:36 +0000420 # issue 5008
421 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000422 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000423 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000424 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000425 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000426 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000427 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000428 with self.open(support.TESTFN, "a") as f:
Serhiy Storchakaea4d2872015-08-02 15:19:04 +0300429 self.assertGreater(f.tell(), 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000430
Christian Heimes1a6387e2008-03-26 12:49:49 +0000431 def test_destructor(self):
432 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000433 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000434 def __del__(self):
435 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000436 try:
437 f = super(MyFileIO, self).__del__
438 except AttributeError:
439 pass
440 else:
441 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000442 def close(self):
443 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000444 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000445 def flush(self):
446 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000447 super(MyFileIO, self).flush()
448 f = MyFileIO(support.TESTFN, "wb")
449 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000450 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000451 support.gc_collect()
452 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000453 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000454 self.assertEqual(f.read(), b"xxx")
455
456 def _check_base_destructor(self, base):
457 record = []
458 class MyIO(base):
459 def __init__(self):
460 # This exercises the availability of attributes on object
461 # destruction.
462 # (in the C version, close() is called by the tp_dealloc
463 # function, not by __del__)
464 self.on_del = 1
465 self.on_close = 2
466 self.on_flush = 3
467 def __del__(self):
468 record.append(self.on_del)
469 try:
470 f = super(MyIO, self).__del__
471 except AttributeError:
472 pass
473 else:
474 f()
475 def close(self):
476 record.append(self.on_close)
477 super(MyIO, self).close()
478 def flush(self):
479 record.append(self.on_flush)
480 super(MyIO, self).flush()
481 f = MyIO()
482 del f
483 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000484 self.assertEqual(record, [1, 2, 3])
485
Antoine Pitrou19690592009-06-12 20:14:08 +0000486 def test_IOBase_destructor(self):
487 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000488
Antoine Pitrou19690592009-06-12 20:14:08 +0000489 def test_RawIOBase_destructor(self):
490 self._check_base_destructor(self.RawIOBase)
491
492 def test_BufferedIOBase_destructor(self):
493 self._check_base_destructor(self.BufferedIOBase)
494
495 def test_TextIOBase_destructor(self):
496 self._check_base_destructor(self.TextIOBase)
497
498 def test_close_flushes(self):
499 with self.open(support.TESTFN, "wb") as f:
500 f.write(b"xxx")
501 with self.open(support.TESTFN, "rb") as f:
502 self.assertEqual(f.read(), b"xxx")
503
504 def test_array_writes(self):
505 a = array.array(b'i', range(10))
506 n = len(a.tostring())
507 with self.open(support.TESTFN, "wb", 0) as f:
508 self.assertEqual(f.write(a), n)
509 with self.open(support.TESTFN, "wb") as f:
510 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000511
512 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000513 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000514 closefd=False)
515
Antoine Pitrou19690592009-06-12 20:14:08 +0000516 def test_read_closed(self):
517 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000518 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000519 with self.open(support.TESTFN, "r") as f:
520 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000521 self.assertEqual(file.read(), "egg\n")
522 file.seek(0)
523 file.close()
524 self.assertRaises(ValueError, file.read)
525
526 def test_no_closefd_with_filename(self):
527 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000528 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000529
530 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000531 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000532 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000533 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000534 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000535 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000536 self.assertEqual(file.buffer.raw.closefd, False)
537
Antoine Pitrou19690592009-06-12 20:14:08 +0000538 def test_garbage_collection(self):
539 # FileIO objects are collected, and collecting them flushes
540 # all data to disk.
541 f = self.FileIO(support.TESTFN, "wb")
542 f.write(b"abcxxx")
543 f.f = f
544 wr = weakref.ref(f)
545 del f
546 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +0300547 self.assertIsNone(wr(), wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000548 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000549 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000550
Antoine Pitrou19690592009-06-12 20:14:08 +0000551 def test_unbounded_file(self):
552 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
553 zero = "/dev/zero"
554 if not os.path.exists(zero):
555 self.skipTest("{0} does not exist".format(zero))
556 if sys.maxsize > 0x7FFFFFFF:
557 self.skipTest("test can only run in a 32-bit address space")
558 if support.real_max_memuse < support._2G:
559 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000560 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000561 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000562 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000563 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000564 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000565 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000566
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200567 def check_flush_error_on_close(self, *args, **kwargs):
568 # Test that the file is closed despite failed flush
569 # and that flush() is called before file closed.
570 f = self.open(*args, **kwargs)
571 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000572 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200573 closed[:] = [f.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000574 raise IOError()
575 f.flush = bad_flush
576 self.assertRaises(IOError, f.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600577 self.assertTrue(f.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200578 self.assertTrue(closed) # flush() called
579 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchaka437d5352015-02-23 00:28:38 +0200580 f.flush = lambda: None # break reference loop
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200581
582 def test_flush_error_on_close(self):
583 # raw file
584 # Issue #5700: io.FileIO calls flush() after file closed
585 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
586 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
587 self.check_flush_error_on_close(fd, 'wb', buffering=0)
588 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
589 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
590 os.close(fd)
591 # buffered io
592 self.check_flush_error_on_close(support.TESTFN, 'wb')
593 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
594 self.check_flush_error_on_close(fd, 'wb')
595 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
596 self.check_flush_error_on_close(fd, 'wb', closefd=False)
597 os.close(fd)
598 # text io
599 self.check_flush_error_on_close(support.TESTFN, 'w')
600 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
601 self.check_flush_error_on_close(fd, 'w')
602 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
603 self.check_flush_error_on_close(fd, 'w', closefd=False)
604 os.close(fd)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000605
606 def test_multi_close(self):
607 f = self.open(support.TESTFN, "wb", buffering=0)
608 f.close()
609 f.close()
610 f.close()
611 self.assertRaises(ValueError, f.flush)
612
Antoine Pitrou6391b342010-09-14 18:48:19 +0000613 def test_RawIOBase_read(self):
614 # Exercise the default RawIOBase.read() implementation (which calls
615 # readinto() internally).
616 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
617 self.assertEqual(rawio.read(2), b"ab")
618 self.assertEqual(rawio.read(2), b"c")
619 self.assertEqual(rawio.read(2), b"d")
620 self.assertEqual(rawio.read(2), None)
621 self.assertEqual(rawio.read(2), b"ef")
622 self.assertEqual(rawio.read(2), b"g")
623 self.assertEqual(rawio.read(2), None)
624 self.assertEqual(rawio.read(2), b"")
625
Hynek Schlawack877effc2012-05-25 09:24:18 +0200626 def test_fileio_closefd(self):
627 # Issue #4841
628 with self.open(__file__, 'rb') as f1, \
629 self.open(__file__, 'rb') as f2:
630 fileio = self.FileIO(f1.fileno(), closefd=False)
631 # .__init__() must not close f1
632 fileio.__init__(f2.fileno(), closefd=False)
633 f1.readline()
634 # .close() must not close f2
635 fileio.close()
636 f2.readline()
637
Serhiy Storchaka05b0a1b2014-06-09 13:32:08 +0300638 def test_nonbuffered_textio(self):
639 with warnings.catch_warnings(record=True) as recorded:
640 with self.assertRaises(ValueError):
641 self.open(support.TESTFN, 'w', buffering=0)
642 support.gc_collect()
643 self.assertEqual(recorded, [])
644
645 def test_invalid_newline(self):
646 with warnings.catch_warnings(record=True) as recorded:
647 with self.assertRaises(ValueError):
648 self.open(support.TESTFN, 'w', newline='invalid')
649 support.gc_collect()
650 self.assertEqual(recorded, [])
651
Hynek Schlawack877effc2012-05-25 09:24:18 +0200652
Antoine Pitrou19690592009-06-12 20:14:08 +0000653class CIOTest(IOTest):
Antoine Pitrou16166452011-07-12 22:04:20 +0200654
655 def test_IOBase_finalize(self):
656 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
657 # class which inherits IOBase and an object of this class are caught
658 # in a reference cycle and close() is already in the method cache.
659 class MyIO(self.IOBase):
660 def close(self):
661 pass
662
663 # create an instance to populate the method cache
664 MyIO()
665 obj = MyIO()
666 obj.obj = obj
667 wr = weakref.ref(obj)
668 del MyIO
669 del obj
670 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +0300671 self.assertIsNone(wr(), wr)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000672
Antoine Pitrou19690592009-06-12 20:14:08 +0000673class PyIOTest(IOTest):
674 test_array_writes = unittest.skip(
675 "len(array.array) returns number of elements rather than bytelength"
676 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000677
678
Antoine Pitrou19690592009-06-12 20:14:08 +0000679class CommonBufferedTests:
680 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
681
682 def test_detach(self):
683 raw = self.MockRawIO()
684 buf = self.tp(raw)
685 self.assertIs(buf.detach(), raw)
686 self.assertRaises(ValueError, buf.detach)
687
Benjamin Peterson53ae6142014-12-21 20:51:50 -0600688 repr(buf) # Should still work
689
Antoine Pitrou19690592009-06-12 20:14:08 +0000690 def test_fileno(self):
691 rawio = self.MockRawIO()
692 bufio = self.tp(rawio)
693
Ezio Melotti2623a372010-11-21 13:34:58 +0000694 self.assertEqual(42, bufio.fileno())
Antoine Pitrou19690592009-06-12 20:14:08 +0000695
Zachary Ware1f702212013-12-10 14:09:20 -0600696 @unittest.skip('test having existential crisis')
Antoine Pitrou19690592009-06-12 20:14:08 +0000697 def test_no_fileno(self):
698 # XXX will we always have fileno() function? If so, kill
699 # this test. Else, write it.
700 pass
701
702 def test_invalid_args(self):
703 rawio = self.MockRawIO()
704 bufio = self.tp(rawio)
705 # Invalid whence
706 self.assertRaises(ValueError, bufio.seek, 0, -1)
707 self.assertRaises(ValueError, bufio.seek, 0, 3)
708
709 def test_override_destructor(self):
710 tp = self.tp
711 record = []
712 class MyBufferedIO(tp):
713 def __del__(self):
714 record.append(1)
715 try:
716 f = super(MyBufferedIO, self).__del__
717 except AttributeError:
718 pass
719 else:
720 f()
721 def close(self):
722 record.append(2)
723 super(MyBufferedIO, self).close()
724 def flush(self):
725 record.append(3)
726 super(MyBufferedIO, self).flush()
727 rawio = self.MockRawIO()
728 bufio = MyBufferedIO(rawio)
729 writable = bufio.writable()
730 del bufio
731 support.gc_collect()
732 if writable:
733 self.assertEqual(record, [1, 2, 3])
734 else:
735 self.assertEqual(record, [1, 2])
736
737 def test_context_manager(self):
738 # Test usability as a context manager
739 rawio = self.MockRawIO()
740 bufio = self.tp(rawio)
741 def _with():
742 with bufio:
743 pass
744 _with()
745 # bufio should now be closed, and using it a second time should raise
746 # a ValueError.
747 self.assertRaises(ValueError, _with)
748
749 def test_error_through_destructor(self):
750 # Test that the exception state is not modified by a destructor,
751 # even if close() fails.
752 rawio = self.CloseFailureIO()
753 def f():
754 self.tp(rawio).xyzzy
755 with support.captured_output("stderr") as s:
756 self.assertRaises(AttributeError, f)
757 s = s.getvalue().strip()
758 if s:
759 # The destructor *may* have printed an unraisable error, check it
760 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000761 self.assertTrue(s.startswith("Exception IOError: "), s)
762 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000763
764 def test_repr(self):
765 raw = self.MockRawIO()
766 b = self.tp(raw)
767 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
768 self.assertEqual(repr(b), "<%s>" % clsname)
769 raw.name = "dummy"
770 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
771 raw.name = b"dummy"
772 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000773
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000774 def test_flush_error_on_close(self):
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200775 # Test that buffered file is closed despite failed flush
776 # and that flush() is called before file closed.
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000777 raw = self.MockRawIO()
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200778 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000779 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200780 closed[:] = [b.closed, raw.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000781 raise IOError()
782 raw.flush = bad_flush
783 b = self.tp(raw)
784 self.assertRaises(IOError, b.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600785 self.assertTrue(b.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200786 self.assertTrue(raw.closed)
787 self.assertTrue(closed) # flush() called
788 self.assertFalse(closed[0]) # flush() called before file closed
789 self.assertFalse(closed[1])
Serhiy Storchaka437d5352015-02-23 00:28:38 +0200790 raw.flush = lambda: None # break reference loop
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600791
792 def test_close_error_on_close(self):
793 raw = self.MockRawIO()
794 def bad_flush():
795 raise IOError('flush')
796 def bad_close():
797 raise IOError('close')
798 raw.close = bad_close
799 b = self.tp(raw)
800 b.flush = bad_flush
801 with self.assertRaises(IOError) as err: # exception not swallowed
802 b.close()
803 self.assertEqual(err.exception.args, ('close',))
804 self.assertFalse(b.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000805
806 def test_multi_close(self):
807 raw = self.MockRawIO()
808 b = self.tp(raw)
809 b.close()
810 b.close()
811 b.close()
812 self.assertRaises(ValueError, b.flush)
813
Antoine Pitroufc9ead62010-12-21 21:26:55 +0000814 def test_readonly_attributes(self):
815 raw = self.MockRawIO()
816 buf = self.tp(raw)
817 x = self.MockRawIO()
818 with self.assertRaises((AttributeError, TypeError)):
819 buf.raw = x
820
Christian Heimes1a6387e2008-03-26 12:49:49 +0000821
Antoine Pitroubff5df02012-07-29 19:02:46 +0200822class SizeofTest:
823
824 @support.cpython_only
825 def test_sizeof(self):
826 bufsize1 = 4096
827 bufsize2 = 8192
828 rawio = self.MockRawIO()
829 bufio = self.tp(rawio, buffer_size=bufsize1)
830 size = sys.getsizeof(bufio) - bufsize1
831 rawio = self.MockRawIO()
832 bufio = self.tp(rawio, buffer_size=bufsize2)
833 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
834
835
Antoine Pitrou19690592009-06-12 20:14:08 +0000836class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
837 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000838
Antoine Pitrou19690592009-06-12 20:14:08 +0000839 def test_constructor(self):
840 rawio = self.MockRawIO([b"abc"])
841 bufio = self.tp(rawio)
842 bufio.__init__(rawio)
843 bufio.__init__(rawio, buffer_size=1024)
844 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000845 self.assertEqual(b"abc", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000846 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
847 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
848 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
849 rawio = self.MockRawIO([b"abc"])
850 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000851 self.assertEqual(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000852
Serhiy Storchaka1d19f972014-02-12 10:52:07 +0200853 def test_uninitialized(self):
854 bufio = self.tp.__new__(self.tp)
855 del bufio
856 bufio = self.tp.__new__(self.tp)
857 self.assertRaisesRegexp((ValueError, AttributeError),
858 'uninitialized|has no attribute',
859 bufio.read, 0)
860 bufio.__init__(self.MockRawIO())
861 self.assertEqual(bufio.read(0), b'')
862
Antoine Pitrou19690592009-06-12 20:14:08 +0000863 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000864 for arg in (None, 7):
865 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
866 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000867 self.assertEqual(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000868 # Invalid args
869 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000870
Antoine Pitrou19690592009-06-12 20:14:08 +0000871 def test_read1(self):
872 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
873 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000874 self.assertEqual(b"a", bufio.read(1))
875 self.assertEqual(b"b", bufio.read1(1))
876 self.assertEqual(rawio._reads, 1)
877 self.assertEqual(b"c", bufio.read1(100))
878 self.assertEqual(rawio._reads, 1)
879 self.assertEqual(b"d", bufio.read1(100))
880 self.assertEqual(rawio._reads, 2)
881 self.assertEqual(b"efg", bufio.read1(100))
882 self.assertEqual(rawio._reads, 3)
883 self.assertEqual(b"", bufio.read1(100))
884 self.assertEqual(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000885 # Invalid args
886 self.assertRaises(ValueError, bufio.read1, -1)
887
888 def test_readinto(self):
889 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
890 bufio = self.tp(rawio)
891 b = bytearray(2)
Ezio Melotti2623a372010-11-21 13:34:58 +0000892 self.assertEqual(bufio.readinto(b), 2)
893 self.assertEqual(b, b"ab")
894 self.assertEqual(bufio.readinto(b), 2)
895 self.assertEqual(b, b"cd")
896 self.assertEqual(bufio.readinto(b), 2)
897 self.assertEqual(b, b"ef")
898 self.assertEqual(bufio.readinto(b), 1)
899 self.assertEqual(b, b"gf")
900 self.assertEqual(bufio.readinto(b), 0)
901 self.assertEqual(b, b"gf")
Antoine Pitrou19690592009-06-12 20:14:08 +0000902
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000903 def test_readlines(self):
904 def bufio():
905 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
906 return self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000907 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
908 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
909 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000910
Antoine Pitrou19690592009-06-12 20:14:08 +0000911 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000912 data = b"abcdefghi"
913 dlen = len(data)
914
915 tests = [
916 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
917 [ 100, [ 3, 3, 3], [ dlen ] ],
918 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
919 ]
920
921 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000922 rawio = self.MockFileIO(data)
923 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000924 pos = 0
925 for nbytes in buf_read_sizes:
Ezio Melotti2623a372010-11-21 13:34:58 +0000926 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Christian Heimes1a6387e2008-03-26 12:49:49 +0000927 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000928 # this is mildly implementation-dependent
Ezio Melotti2623a372010-11-21 13:34:58 +0000929 self.assertEqual(rawio.read_history, raw_read_sizes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000930
Antoine Pitrou19690592009-06-12 20:14:08 +0000931 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000932 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000933 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
934 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000935 self.assertEqual(b"abcd", bufio.read(6))
936 self.assertEqual(b"e", bufio.read(1))
937 self.assertEqual(b"fg", bufio.read())
938 self.assertEqual(b"", bufio.peek(1))
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200939 self.assertIsNone(bufio.read())
Ezio Melotti2623a372010-11-21 13:34:58 +0000940 self.assertEqual(b"", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000941
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200942 rawio = self.MockRawIO((b"a", None, None))
943 self.assertEqual(b"a", rawio.readall())
944 self.assertIsNone(rawio.readall())
945
Antoine Pitrou19690592009-06-12 20:14:08 +0000946 def test_read_past_eof(self):
947 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
948 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000949
Ezio Melotti2623a372010-11-21 13:34:58 +0000950 self.assertEqual(b"abcdefg", bufio.read(9000))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000951
Antoine Pitrou19690592009-06-12 20:14:08 +0000952 def test_read_all(self):
953 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
954 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000955
Ezio Melotti2623a372010-11-21 13:34:58 +0000956 self.assertEqual(b"abcdefg", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000957
Victor Stinner6a102812010-04-27 23:55:59 +0000958 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +0000959 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +0000960 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000961 try:
962 # Write out many bytes with exactly the same number of 0's,
963 # 1's... 255's. This will help us check that concurrent reading
964 # doesn't duplicate or forget contents.
965 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000966 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000967 random.shuffle(l)
968 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000969 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000970 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000971 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000972 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000973 errors = []
974 results = []
975 def f():
976 try:
977 # Intra-buffer read then buffer-flushing read
978 for n in cycle([1, 19]):
979 s = bufio.read(n)
980 if not s:
981 break
982 # list.append() is atomic
983 results.append(s)
984 except Exception as e:
985 errors.append(e)
986 raise
987 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchakabd8c6292015-04-01 12:56:39 +0300988 with support.start_threads(threads):
989 time.sleep(0.02) # yield
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000990 self.assertFalse(errors,
991 "the following exceptions were caught: %r" % errors)
992 s = b''.join(results)
993 for i in range(256):
994 c = bytes(bytearray([i]))
995 self.assertEqual(s.count(c), N)
996 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000997 support.unlink(support.TESTFN)
998
999 def test_misbehaved_io(self):
1000 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1001 bufio = self.tp(rawio)
1002 self.assertRaises(IOError, bufio.seek, 0)
1003 self.assertRaises(IOError, bufio.tell)
1004
Antoine Pitroucb4f47c2010-08-11 13:40:17 +00001005 def test_no_extraneous_read(self):
1006 # Issue #9550; when the raw IO object has satisfied the read request,
1007 # we should not issue any additional reads, otherwise it may block
1008 # (e.g. socket).
1009 bufsize = 16
1010 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1011 rawio = self.MockRawIO([b"x" * n])
1012 bufio = self.tp(rawio, bufsize)
1013 self.assertEqual(bufio.read(n), b"x" * n)
1014 # Simple case: one raw read is enough to satisfy the request.
1015 self.assertEqual(rawio._extraneous_reads, 0,
1016 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1017 # A more complex case where two raw reads are needed to satisfy
1018 # the request.
1019 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1020 bufio = self.tp(rawio, bufsize)
1021 self.assertEqual(bufio.read(n), b"x" * n)
1022 self.assertEqual(rawio._extraneous_reads, 0,
1023 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1024
1025
Antoine Pitroubff5df02012-07-29 19:02:46 +02001026class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001027 tp = io.BufferedReader
1028
1029 def test_constructor(self):
1030 BufferedReaderTest.test_constructor(self)
1031 # The allocation can succeed on 32-bit builds, e.g. with more
1032 # than 2GB RAM and a 64-bit kernel.
1033 if sys.maxsize > 0x7FFFFFFF:
1034 rawio = self.MockRawIO()
1035 bufio = self.tp(rawio)
1036 self.assertRaises((OverflowError, MemoryError, ValueError),
1037 bufio.__init__, rawio, sys.maxsize)
1038
1039 def test_initialization(self):
1040 rawio = self.MockRawIO([b"abc"])
1041 bufio = self.tp(rawio)
1042 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1043 self.assertRaises(ValueError, bufio.read)
1044 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1045 self.assertRaises(ValueError, bufio.read)
1046 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1047 self.assertRaises(ValueError, bufio.read)
1048
1049 def test_misbehaved_io_read(self):
1050 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1051 bufio = self.tp(rawio)
1052 # _pyio.BufferedReader seems to implement reading different, so that
1053 # checking this is not so easy.
1054 self.assertRaises(IOError, bufio.read, 10)
1055
1056 def test_garbage_collection(self):
1057 # C BufferedReader objects are collected.
1058 # The Python version has __del__, so it ends into gc.garbage instead
1059 rawio = self.FileIO(support.TESTFN, "w+b")
1060 f = self.tp(rawio)
1061 f.f = f
1062 wr = weakref.ref(f)
1063 del f
1064 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03001065 self.assertIsNone(wr(), wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00001066
R David Murray5b2cf5e2013-02-23 22:11:21 -05001067 def test_args_error(self):
1068 # Issue #17275
1069 with self.assertRaisesRegexp(TypeError, "BufferedReader"):
1070 self.tp(io.BytesIO(), 1024, 1024, 1024)
1071
1072
Antoine Pitrou19690592009-06-12 20:14:08 +00001073class PyBufferedReaderTest(BufferedReaderTest):
1074 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001075
1076
Antoine Pitrou19690592009-06-12 20:14:08 +00001077class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1078 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001079
Antoine Pitrou19690592009-06-12 20:14:08 +00001080 def test_constructor(self):
1081 rawio = self.MockRawIO()
1082 bufio = self.tp(rawio)
1083 bufio.__init__(rawio)
1084 bufio.__init__(rawio, buffer_size=1024)
1085 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +00001086 self.assertEqual(3, bufio.write(b"abc"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001087 bufio.flush()
1088 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1089 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1090 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1091 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +00001092 self.assertEqual(3, bufio.write(b"ghi"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001093 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001094 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001095
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001096 def test_uninitialized(self):
1097 bufio = self.tp.__new__(self.tp)
1098 del bufio
1099 bufio = self.tp.__new__(self.tp)
1100 self.assertRaisesRegexp((ValueError, AttributeError),
1101 'uninitialized|has no attribute',
1102 bufio.write, b'')
1103 bufio.__init__(self.MockRawIO())
1104 self.assertEqual(bufio.write(b''), 0)
1105
Antoine Pitrou19690592009-06-12 20:14:08 +00001106 def test_detach_flush(self):
1107 raw = self.MockRawIO()
1108 buf = self.tp(raw)
1109 buf.write(b"howdy!")
1110 self.assertFalse(raw._write_stack)
1111 buf.detach()
1112 self.assertEqual(raw._write_stack, [b"howdy!"])
1113
1114 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001115 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +00001116 writer = self.MockRawIO()
1117 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001118 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001119 self.assertFalse(writer._write_stack)
1120
Antoine Pitrou19690592009-06-12 20:14:08 +00001121 def test_write_overflow(self):
1122 writer = self.MockRawIO()
1123 bufio = self.tp(writer, 8)
1124 contents = b"abcdefghijklmnop"
1125 for n in range(0, len(contents), 3):
1126 bufio.write(contents[n:n+3])
1127 flushed = b"".join(writer._write_stack)
1128 # At least (total - 8) bytes were implicitly flushed, perhaps more
1129 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001130 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001131
Antoine Pitrou19690592009-06-12 20:14:08 +00001132 def check_writes(self, intermediate_func):
1133 # Lots of writes, test the flushed output is as expected.
1134 contents = bytes(range(256)) * 1000
1135 n = 0
1136 writer = self.MockRawIO()
1137 bufio = self.tp(writer, 13)
1138 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1139 def gen_sizes():
1140 for size in count(1):
1141 for i in range(15):
1142 yield size
1143 sizes = gen_sizes()
1144 while n < len(contents):
1145 size = min(next(sizes), len(contents) - n)
Ezio Melotti2623a372010-11-21 13:34:58 +00001146 self.assertEqual(bufio.write(contents[n:n+size]), size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001147 intermediate_func(bufio)
1148 n += size
1149 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001150 self.assertEqual(contents,
Antoine Pitrou19690592009-06-12 20:14:08 +00001151 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001152
Antoine Pitrou19690592009-06-12 20:14:08 +00001153 def test_writes(self):
1154 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001155
Antoine Pitrou19690592009-06-12 20:14:08 +00001156 def test_writes_and_flushes(self):
1157 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001158
Antoine Pitrou19690592009-06-12 20:14:08 +00001159 def test_writes_and_seeks(self):
1160 def _seekabs(bufio):
1161 pos = bufio.tell()
1162 bufio.seek(pos + 1, 0)
1163 bufio.seek(pos - 1, 0)
1164 bufio.seek(pos, 0)
1165 self.check_writes(_seekabs)
1166 def _seekrel(bufio):
1167 pos = bufio.seek(0, 1)
1168 bufio.seek(+1, 1)
1169 bufio.seek(-1, 1)
1170 bufio.seek(pos, 0)
1171 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001172
Antoine Pitrou19690592009-06-12 20:14:08 +00001173 def test_writes_and_truncates(self):
1174 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001175
Antoine Pitrou19690592009-06-12 20:14:08 +00001176 def test_write_non_blocking(self):
1177 raw = self.MockNonBlockWriterIO()
1178 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001179
Ezio Melotti2623a372010-11-21 13:34:58 +00001180 self.assertEqual(bufio.write(b"abcd"), 4)
1181 self.assertEqual(bufio.write(b"efghi"), 5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001182 # 1 byte will be written, the rest will be buffered
1183 raw.block_on(b"k")
Ezio Melotti2623a372010-11-21 13:34:58 +00001184 self.assertEqual(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001185
Antoine Pitrou19690592009-06-12 20:14:08 +00001186 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1187 raw.block_on(b"0")
1188 try:
1189 bufio.write(b"opqrwxyz0123456789")
1190 except self.BlockingIOError as e:
1191 written = e.characters_written
1192 else:
1193 self.fail("BlockingIOError should have been raised")
Ezio Melotti2623a372010-11-21 13:34:58 +00001194 self.assertEqual(written, 16)
1195 self.assertEqual(raw.pop_written(),
Antoine Pitrou19690592009-06-12 20:14:08 +00001196 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001197
Ezio Melotti2623a372010-11-21 13:34:58 +00001198 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Antoine Pitrou19690592009-06-12 20:14:08 +00001199 s = raw.pop_written()
1200 # Previously buffered bytes were flushed
1201 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001202
Antoine Pitrou19690592009-06-12 20:14:08 +00001203 def test_write_and_rewind(self):
1204 raw = io.BytesIO()
1205 bufio = self.tp(raw, 4)
1206 self.assertEqual(bufio.write(b"abcdef"), 6)
1207 self.assertEqual(bufio.tell(), 6)
1208 bufio.seek(0, 0)
1209 self.assertEqual(bufio.write(b"XY"), 2)
1210 bufio.seek(6, 0)
1211 self.assertEqual(raw.getvalue(), b"XYcdef")
1212 self.assertEqual(bufio.write(b"123456"), 6)
1213 bufio.flush()
1214 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001215
Antoine Pitrou19690592009-06-12 20:14:08 +00001216 def test_flush(self):
1217 writer = self.MockRawIO()
1218 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001219 bufio.write(b"abc")
1220 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001221 self.assertEqual(b"abc", writer._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001222
Antoine Pitrou78e761e2012-10-16 22:57:11 +02001223 def test_writelines(self):
1224 l = [b'ab', b'cd', b'ef']
1225 writer = self.MockRawIO()
1226 bufio = self.tp(writer, 8)
1227 bufio.writelines(l)
1228 bufio.flush()
1229 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1230
1231 def test_writelines_userlist(self):
1232 l = UserList([b'ab', b'cd', b'ef'])
1233 writer = self.MockRawIO()
1234 bufio = self.tp(writer, 8)
1235 bufio.writelines(l)
1236 bufio.flush()
1237 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1238
1239 def test_writelines_error(self):
1240 writer = self.MockRawIO()
1241 bufio = self.tp(writer, 8)
1242 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1243 self.assertRaises(TypeError, bufio.writelines, None)
1244
Antoine Pitrou19690592009-06-12 20:14:08 +00001245 def test_destructor(self):
1246 writer = self.MockRawIO()
1247 bufio = self.tp(writer, 8)
1248 bufio.write(b"abc")
1249 del bufio
1250 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001251 self.assertEqual(b"abc", writer._write_stack[0])
Antoine Pitrou19690592009-06-12 20:14:08 +00001252
1253 def test_truncate(self):
1254 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001255 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001256 bufio = self.tp(raw, 8)
1257 bufio.write(b"abcdef")
1258 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001259 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001260 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001261 self.assertEqual(f.read(), b"abc")
1262
Victor Stinner6a102812010-04-27 23:55:59 +00001263 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001264 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001265 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001266 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001267 # Write out many bytes from many threads and test they were
1268 # all flushed.
1269 N = 1000
1270 contents = bytes(range(256)) * N
1271 sizes = cycle([1, 19])
1272 n = 0
1273 queue = deque()
1274 while n < len(contents):
1275 size = next(sizes)
1276 queue.append(contents[n:n+size])
1277 n += size
1278 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001279 # We use a real file object because it allows us to
1280 # exercise situations where the GIL is released before
1281 # writing the buffer to the raw streams. This is in addition
1282 # to concurrency issues due to switching threads in the middle
1283 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001284 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001285 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001286 errors = []
1287 def f():
1288 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001289 while True:
1290 try:
1291 s = queue.popleft()
1292 except IndexError:
1293 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001294 bufio.write(s)
1295 except Exception as e:
1296 errors.append(e)
1297 raise
1298 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03001299 with support.start_threads(threads):
1300 time.sleep(0.02) # yield
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001301 self.assertFalse(errors,
1302 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001303 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001304 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001305 s = f.read()
1306 for i in range(256):
Ezio Melotti2623a372010-11-21 13:34:58 +00001307 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001308 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001309 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001310
Antoine Pitrou19690592009-06-12 20:14:08 +00001311 def test_misbehaved_io(self):
1312 rawio = self.MisbehavedRawIO()
1313 bufio = self.tp(rawio, 5)
1314 self.assertRaises(IOError, bufio.seek, 0)
1315 self.assertRaises(IOError, bufio.tell)
1316 self.assertRaises(IOError, bufio.write, b"abcdef")
1317
1318 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001319 with support.check_warnings(("max_buffer_size is deprecated",
1320 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001321 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001322
Benjamin Petersona2d6d712012-12-20 12:24:10 -06001323 def test_write_error_on_close(self):
1324 raw = self.MockRawIO()
1325 def bad_write(b):
1326 raise IOError()
1327 raw.write = bad_write
1328 b = self.tp(raw)
1329 b.write(b'spam')
1330 self.assertRaises(IOError, b.close) # exception not swallowed
1331 self.assertTrue(b.closed)
1332
Antoine Pitrou19690592009-06-12 20:14:08 +00001333
Antoine Pitroubff5df02012-07-29 19:02:46 +02001334class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001335 tp = io.BufferedWriter
1336
1337 def test_constructor(self):
1338 BufferedWriterTest.test_constructor(self)
1339 # The allocation can succeed on 32-bit builds, e.g. with more
1340 # than 2GB RAM and a 64-bit kernel.
1341 if sys.maxsize > 0x7FFFFFFF:
1342 rawio = self.MockRawIO()
1343 bufio = self.tp(rawio)
1344 self.assertRaises((OverflowError, MemoryError, ValueError),
1345 bufio.__init__, rawio, sys.maxsize)
1346
1347 def test_initialization(self):
1348 rawio = self.MockRawIO()
1349 bufio = self.tp(rawio)
1350 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1351 self.assertRaises(ValueError, bufio.write, b"def")
1352 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1353 self.assertRaises(ValueError, bufio.write, b"def")
1354 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1355 self.assertRaises(ValueError, bufio.write, b"def")
1356
1357 def test_garbage_collection(self):
1358 # C BufferedWriter objects are collected, and collecting them flushes
1359 # all data to disk.
1360 # The Python version has __del__, so it ends into gc.garbage instead
1361 rawio = self.FileIO(support.TESTFN, "w+b")
1362 f = self.tp(rawio)
1363 f.write(b"123xxx")
1364 f.x = f
1365 wr = weakref.ref(f)
1366 del f
1367 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03001368 self.assertIsNone(wr(), wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001369 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001370 self.assertEqual(f.read(), b"123xxx")
1371
R David Murray5b2cf5e2013-02-23 22:11:21 -05001372 def test_args_error(self):
1373 # Issue #17275
1374 with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
1375 self.tp(io.BytesIO(), 1024, 1024, 1024)
1376
Antoine Pitrou19690592009-06-12 20:14:08 +00001377
1378class PyBufferedWriterTest(BufferedWriterTest):
1379 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001380
1381class BufferedRWPairTest(unittest.TestCase):
1382
Antoine Pitrou19690592009-06-12 20:14:08 +00001383 def test_constructor(self):
1384 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001385 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001386
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001387 def test_uninitialized(self):
1388 pair = self.tp.__new__(self.tp)
1389 del pair
1390 pair = self.tp.__new__(self.tp)
1391 self.assertRaisesRegexp((ValueError, AttributeError),
1392 'uninitialized|has no attribute',
1393 pair.read, 0)
1394 self.assertRaisesRegexp((ValueError, AttributeError),
1395 'uninitialized|has no attribute',
1396 pair.write, b'')
1397 pair.__init__(self.MockRawIO(), self.MockRawIO())
1398 self.assertEqual(pair.read(0), b'')
1399 self.assertEqual(pair.write(b''), 0)
1400
Antoine Pitrou19690592009-06-12 20:14:08 +00001401 def test_detach(self):
1402 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1403 self.assertRaises(self.UnsupportedOperation, pair.detach)
1404
1405 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001406 with support.check_warnings(("max_buffer_size is deprecated",
1407 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001408 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001409
1410 def test_constructor_with_not_readable(self):
1411 class NotReadable(MockRawIO):
1412 def readable(self):
1413 return False
1414
1415 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1416
1417 def test_constructor_with_not_writeable(self):
1418 class NotWriteable(MockRawIO):
1419 def writable(self):
1420 return False
1421
1422 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1423
1424 def test_read(self):
1425 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1426
1427 self.assertEqual(pair.read(3), b"abc")
1428 self.assertEqual(pair.read(1), b"d")
1429 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001430 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1431 self.assertEqual(pair.read(None), b"abc")
1432
1433 def test_readlines(self):
1434 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1435 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1436 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1437 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001438
1439 def test_read1(self):
1440 # .read1() is delegated to the underlying reader object, so this test
1441 # can be shallow.
1442 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1443
1444 self.assertEqual(pair.read1(3), b"abc")
1445
1446 def test_readinto(self):
1447 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1448
1449 data = bytearray(5)
1450 self.assertEqual(pair.readinto(data), 5)
1451 self.assertEqual(data, b"abcde")
1452
1453 def test_write(self):
1454 w = self.MockRawIO()
1455 pair = self.tp(self.MockRawIO(), w)
1456
1457 pair.write(b"abc")
1458 pair.flush()
1459 pair.write(b"def")
1460 pair.flush()
1461 self.assertEqual(w._write_stack, [b"abc", b"def"])
1462
1463 def test_peek(self):
1464 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1465
1466 self.assertTrue(pair.peek(3).startswith(b"abc"))
1467 self.assertEqual(pair.read(3), b"abc")
1468
1469 def test_readable(self):
1470 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1471 self.assertTrue(pair.readable())
1472
1473 def test_writeable(self):
1474 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1475 self.assertTrue(pair.writable())
1476
1477 def test_seekable(self):
1478 # BufferedRWPairs are never seekable, even if their readers and writers
1479 # are.
1480 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1481 self.assertFalse(pair.seekable())
1482
1483 # .flush() is delegated to the underlying writer object and has been
1484 # tested in the test_write method.
1485
1486 def test_close_and_closed(self):
1487 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1488 self.assertFalse(pair.closed)
1489 pair.close()
1490 self.assertTrue(pair.closed)
1491
Serhiy Storchakaf95a57f2015-03-24 23:23:42 +02001492 def test_reader_close_error_on_close(self):
1493 def reader_close():
1494 reader_non_existing
1495 reader = self.MockRawIO()
1496 reader.close = reader_close
1497 writer = self.MockRawIO()
1498 pair = self.tp(reader, writer)
1499 with self.assertRaises(NameError) as err:
1500 pair.close()
1501 self.assertIn('reader_non_existing', str(err.exception))
1502 self.assertTrue(pair.closed)
1503 self.assertFalse(reader.closed)
1504 self.assertTrue(writer.closed)
1505
1506 def test_writer_close_error_on_close(self):
1507 def writer_close():
1508 writer_non_existing
1509 reader = self.MockRawIO()
1510 writer = self.MockRawIO()
1511 writer.close = writer_close
1512 pair = self.tp(reader, writer)
1513 with self.assertRaises(NameError) as err:
1514 pair.close()
1515 self.assertIn('writer_non_existing', str(err.exception))
1516 self.assertFalse(pair.closed)
1517 self.assertTrue(reader.closed)
1518 self.assertFalse(writer.closed)
1519
1520 def test_reader_writer_close_error_on_close(self):
1521 def reader_close():
1522 reader_non_existing
1523 def writer_close():
1524 writer_non_existing
1525 reader = self.MockRawIO()
1526 reader.close = reader_close
1527 writer = self.MockRawIO()
1528 writer.close = writer_close
1529 pair = self.tp(reader, writer)
1530 with self.assertRaises(NameError) as err:
1531 pair.close()
1532 self.assertIn('reader_non_existing', str(err.exception))
1533 self.assertFalse(pair.closed)
1534 self.assertFalse(reader.closed)
1535 self.assertFalse(writer.closed)
1536
Antoine Pitrou19690592009-06-12 20:14:08 +00001537 def test_isatty(self):
1538 class SelectableIsAtty(MockRawIO):
1539 def __init__(self, isatty):
1540 MockRawIO.__init__(self)
1541 self._isatty = isatty
1542
1543 def isatty(self):
1544 return self._isatty
1545
1546 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1547 self.assertFalse(pair.isatty())
1548
1549 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1550 self.assertTrue(pair.isatty())
1551
1552 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1553 self.assertTrue(pair.isatty())
1554
1555 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1556 self.assertTrue(pair.isatty())
1557
Benjamin Peterson1c873bf2014-09-29 22:46:57 -04001558 def test_weakref_clearing(self):
1559 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1560 ref = weakref.ref(brw)
1561 brw = None
1562 ref = None # Shouldn't segfault.
1563
Antoine Pitrou19690592009-06-12 20:14:08 +00001564class CBufferedRWPairTest(BufferedRWPairTest):
1565 tp = io.BufferedRWPair
1566
1567class PyBufferedRWPairTest(BufferedRWPairTest):
1568 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001569
1570
Antoine Pitrou19690592009-06-12 20:14:08 +00001571class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1572 read_mode = "rb+"
1573 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001574
Antoine Pitrou19690592009-06-12 20:14:08 +00001575 def test_constructor(self):
1576 BufferedReaderTest.test_constructor(self)
1577 BufferedWriterTest.test_constructor(self)
1578
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001579 def test_uninitialized(self):
1580 BufferedReaderTest.test_uninitialized(self)
1581 BufferedWriterTest.test_uninitialized(self)
1582
Antoine Pitrou19690592009-06-12 20:14:08 +00001583 def test_read_and_write(self):
1584 raw = self.MockRawIO((b"asdf", b"ghjk"))
1585 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001586
1587 self.assertEqual(b"as", rw.read(2))
1588 rw.write(b"ddd")
1589 rw.write(b"eee")
1590 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001591 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001592 self.assertEqual(b"dddeee", raw._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001593
Antoine Pitrou19690592009-06-12 20:14:08 +00001594 def test_seek_and_tell(self):
1595 raw = self.BytesIO(b"asdfghjkl")
1596 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001597
Ezio Melotti2623a372010-11-21 13:34:58 +00001598 self.assertEqual(b"as", rw.read(2))
1599 self.assertEqual(2, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001600 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001601 self.assertEqual(b"asdf", rw.read(4))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001602
Antoine Pitrou808cec52011-08-20 15:40:58 +02001603 rw.write(b"123f")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001604 rw.seek(0, 0)
Antoine Pitrou808cec52011-08-20 15:40:58 +02001605 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001606 self.assertEqual(9, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001607 rw.seek(-4, 2)
Ezio Melotti2623a372010-11-21 13:34:58 +00001608 self.assertEqual(5, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001609 rw.seek(2, 1)
Ezio Melotti2623a372010-11-21 13:34:58 +00001610 self.assertEqual(7, rw.tell())
1611 self.assertEqual(b"fl", rw.read(11))
Antoine Pitrou808cec52011-08-20 15:40:58 +02001612 rw.flush()
1613 self.assertEqual(b"asdf123fl", raw.getvalue())
1614
Christian Heimes1a6387e2008-03-26 12:49:49 +00001615 self.assertRaises(TypeError, rw.seek, 0.0)
1616
Antoine Pitrou19690592009-06-12 20:14:08 +00001617 def check_flush_and_read(self, read_func):
1618 raw = self.BytesIO(b"abcdefghi")
1619 bufio = self.tp(raw)
1620
Ezio Melotti2623a372010-11-21 13:34:58 +00001621 self.assertEqual(b"ab", read_func(bufio, 2))
Antoine Pitrou19690592009-06-12 20:14:08 +00001622 bufio.write(b"12")
Ezio Melotti2623a372010-11-21 13:34:58 +00001623 self.assertEqual(b"ef", read_func(bufio, 2))
1624 self.assertEqual(6, bufio.tell())
Antoine Pitrou19690592009-06-12 20:14:08 +00001625 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001626 self.assertEqual(6, bufio.tell())
1627 self.assertEqual(b"ghi", read_func(bufio))
Antoine Pitrou19690592009-06-12 20:14:08 +00001628 raw.seek(0, 0)
1629 raw.write(b"XYZ")
1630 # flush() resets the read buffer
1631 bufio.flush()
1632 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001633 self.assertEqual(b"XYZ", read_func(bufio, 3))
Antoine Pitrou19690592009-06-12 20:14:08 +00001634
1635 def test_flush_and_read(self):
1636 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1637
1638 def test_flush_and_readinto(self):
1639 def _readinto(bufio, n=-1):
1640 b = bytearray(n if n >= 0 else 9999)
1641 n = bufio.readinto(b)
1642 return bytes(b[:n])
1643 self.check_flush_and_read(_readinto)
1644
1645 def test_flush_and_peek(self):
1646 def _peek(bufio, n=-1):
1647 # This relies on the fact that the buffer can contain the whole
1648 # raw stream, otherwise peek() can return less.
1649 b = bufio.peek(n)
1650 if n != -1:
1651 b = b[:n]
1652 bufio.seek(len(b), 1)
1653 return b
1654 self.check_flush_and_read(_peek)
1655
1656 def test_flush_and_write(self):
1657 raw = self.BytesIO(b"abcdefghi")
1658 bufio = self.tp(raw)
1659
1660 bufio.write(b"123")
1661 bufio.flush()
1662 bufio.write(b"45")
1663 bufio.flush()
1664 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001665 self.assertEqual(b"12345fghi", raw.getvalue())
1666 self.assertEqual(b"12345fghi", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +00001667
1668 def test_threads(self):
1669 BufferedReaderTest.test_threads(self)
1670 BufferedWriterTest.test_threads(self)
1671
1672 def test_writes_and_peek(self):
1673 def _peek(bufio):
1674 bufio.peek(1)
1675 self.check_writes(_peek)
1676 def _peek(bufio):
1677 pos = bufio.tell()
1678 bufio.seek(-1, 1)
1679 bufio.peek(1)
1680 bufio.seek(pos, 0)
1681 self.check_writes(_peek)
1682
1683 def test_writes_and_reads(self):
1684 def _read(bufio):
1685 bufio.seek(-1, 1)
1686 bufio.read(1)
1687 self.check_writes(_read)
1688
1689 def test_writes_and_read1s(self):
1690 def _read1(bufio):
1691 bufio.seek(-1, 1)
1692 bufio.read1(1)
1693 self.check_writes(_read1)
1694
1695 def test_writes_and_readintos(self):
1696 def _read(bufio):
1697 bufio.seek(-1, 1)
1698 bufio.readinto(bytearray(1))
1699 self.check_writes(_read)
1700
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001701 def test_write_after_readahead(self):
1702 # Issue #6629: writing after the buffer was filled by readahead should
1703 # first rewind the raw stream.
1704 for overwrite_size in [1, 5]:
1705 raw = self.BytesIO(b"A" * 10)
1706 bufio = self.tp(raw, 4)
1707 # Trigger readahead
1708 self.assertEqual(bufio.read(1), b"A")
1709 self.assertEqual(bufio.tell(), 1)
1710 # Overwriting should rewind the raw stream if it needs so
1711 bufio.write(b"B" * overwrite_size)
1712 self.assertEqual(bufio.tell(), overwrite_size + 1)
1713 # If the write size was smaller than the buffer size, flush() and
1714 # check that rewind happens.
1715 bufio.flush()
1716 self.assertEqual(bufio.tell(), overwrite_size + 1)
1717 s = raw.getvalue()
1718 self.assertEqual(s,
1719 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1720
Antoine Pitrouee46a7b2011-05-13 00:31:52 +02001721 def test_write_rewind_write(self):
1722 # Various combinations of reading / writing / seeking backwards / writing again
1723 def mutate(bufio, pos1, pos2):
1724 assert pos2 >= pos1
1725 # Fill the buffer
1726 bufio.seek(pos1)
1727 bufio.read(pos2 - pos1)
1728 bufio.write(b'\x02')
1729 # This writes earlier than the previous write, but still inside
1730 # the buffer.
1731 bufio.seek(pos1)
1732 bufio.write(b'\x01')
1733
1734 b = b"\x80\x81\x82\x83\x84"
1735 for i in range(0, len(b)):
1736 for j in range(i, len(b)):
1737 raw = self.BytesIO(b)
1738 bufio = self.tp(raw, 100)
1739 mutate(bufio, i, j)
1740 bufio.flush()
1741 expected = bytearray(b)
1742 expected[j] = 2
1743 expected[i] = 1
1744 self.assertEqual(raw.getvalue(), expected,
1745 "failed result for i=%d, j=%d" % (i, j))
1746
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001747 def test_truncate_after_read_or_write(self):
1748 raw = self.BytesIO(b"A" * 10)
1749 bufio = self.tp(raw, 100)
1750 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1751 self.assertEqual(bufio.truncate(), 2)
1752 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1753 self.assertEqual(bufio.truncate(), 4)
1754
Antoine Pitrou19690592009-06-12 20:14:08 +00001755 def test_misbehaved_io(self):
1756 BufferedReaderTest.test_misbehaved_io(self)
1757 BufferedWriterTest.test_misbehaved_io(self)
1758
Antoine Pitrou808cec52011-08-20 15:40:58 +02001759 def test_interleaved_read_write(self):
1760 # Test for issue #12213
1761 with self.BytesIO(b'abcdefgh') as raw:
1762 with self.tp(raw, 100) as f:
1763 f.write(b"1")
1764 self.assertEqual(f.read(1), b'b')
1765 f.write(b'2')
1766 self.assertEqual(f.read1(1), b'd')
1767 f.write(b'3')
1768 buf = bytearray(1)
1769 f.readinto(buf)
1770 self.assertEqual(buf, b'f')
1771 f.write(b'4')
1772 self.assertEqual(f.peek(1), b'h')
1773 f.flush()
1774 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1775
1776 with self.BytesIO(b'abc') as raw:
1777 with self.tp(raw, 100) as f:
1778 self.assertEqual(f.read(1), b'a')
1779 f.write(b"2")
1780 self.assertEqual(f.read(1), b'c')
1781 f.flush()
1782 self.assertEqual(raw.getvalue(), b'a2c')
1783
1784 def test_interleaved_readline_write(self):
1785 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1786 with self.tp(raw) as f:
1787 f.write(b'1')
1788 self.assertEqual(f.readline(), b'b\n')
1789 f.write(b'2')
1790 self.assertEqual(f.readline(), b'def\n')
1791 f.write(b'3')
1792 self.assertEqual(f.readline(), b'\n')
1793 f.flush()
1794 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1795
R David Murray5b2cf5e2013-02-23 22:11:21 -05001796
Antoine Pitroubff5df02012-07-29 19:02:46 +02001797class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
1798 BufferedRandomTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001799 tp = io.BufferedRandom
1800
1801 def test_constructor(self):
1802 BufferedRandomTest.test_constructor(self)
1803 # The allocation can succeed on 32-bit builds, e.g. with more
1804 # than 2GB RAM and a 64-bit kernel.
1805 if sys.maxsize > 0x7FFFFFFF:
1806 rawio = self.MockRawIO()
1807 bufio = self.tp(rawio)
1808 self.assertRaises((OverflowError, MemoryError, ValueError),
1809 bufio.__init__, rawio, sys.maxsize)
1810
1811 def test_garbage_collection(self):
1812 CBufferedReaderTest.test_garbage_collection(self)
1813 CBufferedWriterTest.test_garbage_collection(self)
1814
R David Murray5b2cf5e2013-02-23 22:11:21 -05001815 def test_args_error(self):
1816 # Issue #17275
1817 with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
1818 self.tp(io.BytesIO(), 1024, 1024, 1024)
1819
1820
Antoine Pitrou19690592009-06-12 20:14:08 +00001821class PyBufferedRandomTest(BufferedRandomTest):
1822 tp = pyio.BufferedRandom
1823
1824
Christian Heimes1a6387e2008-03-26 12:49:49 +00001825# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1826# properties:
1827# - A single output character can correspond to many bytes of input.
1828# - The number of input bytes to complete the character can be
1829# undetermined until the last input byte is received.
1830# - The number of input bytes can vary depending on previous input.
1831# - A single input byte can correspond to many characters of output.
1832# - The number of output characters can be undetermined until the
1833# last input byte is received.
1834# - The number of output characters can vary depending on previous input.
1835
1836class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1837 """
1838 For testing seek/tell behavior with a stateful, buffering decoder.
1839
1840 Input is a sequence of words. Words may be fixed-length (length set
1841 by input) or variable-length (period-terminated). In variable-length
1842 mode, extra periods are ignored. Possible words are:
1843 - 'i' followed by a number sets the input length, I (maximum 99).
1844 When I is set to 0, words are space-terminated.
1845 - 'o' followed by a number sets the output length, O (maximum 99).
1846 - Any other word is converted into a word followed by a period on
1847 the output. The output word consists of the input word truncated
1848 or padded out with hyphens to make its length equal to O. If O
1849 is 0, the word is output verbatim without truncating or padding.
1850 I and O are initially set to 1. When I changes, any buffered input is
1851 re-scanned according to the new I. EOF also terminates the last word.
1852 """
1853
1854 def __init__(self, errors='strict'):
1855 codecs.IncrementalDecoder.__init__(self, errors)
1856 self.reset()
1857
1858 def __repr__(self):
1859 return '<SID %x>' % id(self)
1860
1861 def reset(self):
1862 self.i = 1
1863 self.o = 1
1864 self.buffer = bytearray()
1865
1866 def getstate(self):
1867 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1868 return bytes(self.buffer), i*100 + o
1869
1870 def setstate(self, state):
1871 buffer, io = state
1872 self.buffer = bytearray(buffer)
1873 i, o = divmod(io, 100)
1874 self.i, self.o = i ^ 1, o ^ 1
1875
1876 def decode(self, input, final=False):
1877 output = ''
1878 for b in input:
1879 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001880 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001881 if self.buffer:
1882 output += self.process_word()
1883 else:
1884 self.buffer.append(b)
1885 else: # fixed-length, terminate after self.i bytes
1886 self.buffer.append(b)
1887 if len(self.buffer) == self.i:
1888 output += self.process_word()
1889 if final and self.buffer: # EOF terminates the last word
1890 output += self.process_word()
1891 return output
1892
1893 def process_word(self):
1894 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001895 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001896 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001897 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001898 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1899 else:
1900 output = self.buffer.decode('ascii')
1901 if len(output) < self.o:
1902 output += '-'*self.o # pad out with hyphens
1903 if self.o:
1904 output = output[:self.o] # truncate to output length
1905 output += '.'
1906 self.buffer = bytearray()
1907 return output
1908
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001909 codecEnabled = False
1910
1911 @classmethod
1912 def lookupTestDecoder(cls, name):
1913 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001914 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001915 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001916 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001917 incrementalencoder=None,
1918 streamreader=None, streamwriter=None,
1919 incrementaldecoder=cls)
1920
1921# Register the previous decoder for testing.
1922# Disabled by default, tests will enable it.
1923codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1924
1925
Christian Heimes1a6387e2008-03-26 12:49:49 +00001926class StatefulIncrementalDecoderTest(unittest.TestCase):
1927 """
1928 Make sure the StatefulIncrementalDecoder actually works.
1929 """
1930
1931 test_cases = [
1932 # I=1, O=1 (fixed-length input == fixed-length output)
1933 (b'abcd', False, 'a.b.c.d.'),
1934 # I=0, O=0 (variable-length input, variable-length output)
1935 (b'oiabcd', True, 'abcd.'),
1936 # I=0, O=0 (should ignore extra periods)
1937 (b'oi...abcd...', True, 'abcd.'),
1938 # I=0, O=6 (variable-length input, fixed-length output)
1939 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1940 # I=2, O=6 (fixed-length input < fixed-length output)
1941 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1942 # I=6, O=3 (fixed-length input > fixed-length output)
1943 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1944 # I=0, then 3; O=29, then 15 (with longer output)
1945 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1946 'a----------------------------.' +
1947 'b----------------------------.' +
1948 'cde--------------------------.' +
1949 'abcdefghijabcde.' +
1950 'a.b------------.' +
1951 '.c.------------.' +
1952 'd.e------------.' +
1953 'k--------------.' +
1954 'l--------------.' +
1955 'm--------------.')
1956 ]
1957
Antoine Pitrou19690592009-06-12 20:14:08 +00001958 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001959 # Try a few one-shot test cases.
1960 for input, eof, output in self.test_cases:
1961 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001962 self.assertEqual(d.decode(input, eof), output)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001963
1964 # Also test an unfinished decode, followed by forcing EOF.
1965 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001966 self.assertEqual(d.decode(b'oiabcd'), '')
1967 self.assertEqual(d.decode(b'', 1), 'abcd.')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001968
1969class TextIOWrapperTest(unittest.TestCase):
1970
1971 def setUp(self):
1972 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1973 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001974 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001975
1976 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001977 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001978
Antoine Pitrou19690592009-06-12 20:14:08 +00001979 def test_constructor(self):
1980 r = self.BytesIO(b"\xc3\xa9\n\n")
1981 b = self.BufferedReader(r, 1000)
1982 t = self.TextIOWrapper(b)
1983 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti2623a372010-11-21 13:34:58 +00001984 self.assertEqual(t.encoding, "latin1")
1985 self.assertEqual(t.line_buffering, False)
Antoine Pitrou19690592009-06-12 20:14:08 +00001986 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti2623a372010-11-21 13:34:58 +00001987 self.assertEqual(t.encoding, "utf8")
1988 self.assertEqual(t.line_buffering, True)
1989 self.assertEqual("\xe9\n", t.readline())
Antoine Pitrou19690592009-06-12 20:14:08 +00001990 self.assertRaises(TypeError, t.__init__, b, newline=42)
1991 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1992
Serhiy Storchakaabb7e652015-04-16 11:56:35 +03001993 def test_uninitialized(self):
1994 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
1995 del t
1996 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
1997 self.assertRaises(Exception, repr, t)
1998 self.assertRaisesRegexp((ValueError, AttributeError),
1999 'uninitialized|has no attribute',
2000 t.read, 0)
2001 t.__init__(self.MockRawIO())
2002 self.assertEqual(t.read(0), u'')
2003
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002004 def test_non_text_encoding_codecs_are_rejected(self):
2005 # Ensure the constructor complains if passed a codec that isn't
2006 # marked as a text encoding
2007 # http://bugs.python.org/issue20404
2008 r = self.BytesIO()
2009 b = self.BufferedWriter(r)
2010 with support.check_py3k_warnings():
2011 self.TextIOWrapper(b, encoding="hex_codec")
2012
Antoine Pitrou19690592009-06-12 20:14:08 +00002013 def test_detach(self):
2014 r = self.BytesIO()
2015 b = self.BufferedWriter(r)
2016 t = self.TextIOWrapper(b)
2017 self.assertIs(t.detach(), b)
2018
2019 t = self.TextIOWrapper(b, encoding="ascii")
2020 t.write("howdy")
2021 self.assertFalse(r.getvalue())
2022 t.detach()
2023 self.assertEqual(r.getvalue(), b"howdy")
2024 self.assertRaises(ValueError, t.detach)
2025
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002026 # Operations independent of the detached stream should still work
2027 repr(t)
2028 self.assertEqual(t.encoding, "ascii")
2029 self.assertEqual(t.errors, "strict")
2030 self.assertFalse(t.line_buffering)
2031
Antoine Pitrou19690592009-06-12 20:14:08 +00002032 def test_repr(self):
2033 raw = self.BytesIO("hello".encode("utf-8"))
2034 b = self.BufferedReader(raw)
2035 t = self.TextIOWrapper(b, encoding="utf-8")
2036 modname = self.TextIOWrapper.__module__
2037 self.assertEqual(repr(t),
2038 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2039 raw.name = "dummy"
2040 self.assertEqual(repr(t),
2041 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
2042 raw.name = b"dummy"
2043 self.assertEqual(repr(t),
2044 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
2045
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002046 t.buffer.detach()
2047 repr(t) # Should not raise an exception
2048
Antoine Pitrou19690592009-06-12 20:14:08 +00002049 def test_line_buffering(self):
2050 r = self.BytesIO()
2051 b = self.BufferedWriter(r, 1000)
2052 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
2053 t.write("X")
Ezio Melotti2623a372010-11-21 13:34:58 +00002054 self.assertEqual(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00002055 t.write("Y\nZ")
Ezio Melotti2623a372010-11-21 13:34:58 +00002056 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00002057 t.write("A\rB")
Ezio Melotti2623a372010-11-21 13:34:58 +00002058 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002059
Antoine Pitrou19690592009-06-12 20:14:08 +00002060 def test_encoding(self):
2061 # Check the encoding attribute is always set, and valid
2062 b = self.BytesIO()
2063 t = self.TextIOWrapper(b, encoding="utf8")
2064 self.assertEqual(t.encoding, "utf8")
2065 t = self.TextIOWrapper(b)
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03002066 self.assertIsNotNone(t.encoding)
Antoine Pitrou19690592009-06-12 20:14:08 +00002067 codecs.lookup(t.encoding)
2068
2069 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002070 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00002071 b = self.BytesIO(b"abc\n\xff\n")
2072 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002073 self.assertRaises(UnicodeError, t.read)
2074 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00002075 b = self.BytesIO(b"abc\n\xff\n")
2076 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002077 self.assertRaises(UnicodeError, t.read)
2078 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00002079 b = self.BytesIO(b"abc\n\xff\n")
2080 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti2623a372010-11-21 13:34:58 +00002081 self.assertEqual(t.read(), "abc\n\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002082 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00002083 b = self.BytesIO(b"abc\n\xff\n")
2084 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti2623a372010-11-21 13:34:58 +00002085 self.assertEqual(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002086
Antoine Pitrou19690592009-06-12 20:14:08 +00002087 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002088 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00002089 b = self.BytesIO()
2090 t = self.TextIOWrapper(b, encoding="ascii")
2091 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002092 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00002093 b = self.BytesIO()
2094 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2095 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002096 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00002097 b = self.BytesIO()
2098 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00002099 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00002100 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002101 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002102 self.assertEqual(b.getvalue(), b"abcdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002103 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00002104 b = self.BytesIO()
2105 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00002106 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00002107 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002108 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002109 self.assertEqual(b.getvalue(), b"abc?def\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002110
Antoine Pitrou19690592009-06-12 20:14:08 +00002111 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002112 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2113
2114 tests = [
2115 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
2116 [ '', input_lines ],
2117 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2118 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2119 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
2120 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002121 encodings = (
2122 'utf-8', 'latin-1',
2123 'utf-16', 'utf-16-le', 'utf-16-be',
2124 'utf-32', 'utf-32-le', 'utf-32-be',
2125 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00002126
2127 # Try a range of buffer sizes to test the case where \r is the last
2128 # character in TextIOWrapper._pending_line.
2129 for encoding in encodings:
2130 # XXX: str.encode() should return bytes
2131 data = bytes(''.join(input_lines).encode(encoding))
2132 for do_reads in (False, True):
2133 for bufsize in range(1, 10):
2134 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002135 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2136 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00002137 encoding=encoding)
2138 if do_reads:
2139 got_lines = []
2140 while True:
2141 c2 = textio.read(2)
2142 if c2 == '':
2143 break
Ezio Melotti2623a372010-11-21 13:34:58 +00002144 self.assertEqual(len(c2), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002145 got_lines.append(c2 + textio.readline())
2146 else:
2147 got_lines = list(textio)
2148
2149 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti2623a372010-11-21 13:34:58 +00002150 self.assertEqual(got_line, exp_line)
2151 self.assertEqual(len(got_lines), len(exp_lines))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002152
Antoine Pitrou19690592009-06-12 20:14:08 +00002153 def test_newlines_input(self):
2154 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002155 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2156 for newline, expected in [
2157 (None, normalized.decode("ascii").splitlines(True)),
2158 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00002159 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2160 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2161 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00002162 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00002163 buf = self.BytesIO(testdata)
2164 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti2623a372010-11-21 13:34:58 +00002165 self.assertEqual(txt.readlines(), expected)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002166 txt.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002167 self.assertEqual(txt.read(), "".join(expected))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002168
Antoine Pitrou19690592009-06-12 20:14:08 +00002169 def test_newlines_output(self):
2170 testdict = {
2171 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2172 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2173 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2174 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2175 }
2176 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2177 for newline, expected in tests:
2178 buf = self.BytesIO()
2179 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2180 txt.write("AAA\nB")
2181 txt.write("BB\nCCC\n")
2182 txt.write("X\rY\r\nZ")
2183 txt.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002184 self.assertEqual(buf.closed, False)
2185 self.assertEqual(buf.getvalue(), expected)
Antoine Pitrou19690592009-06-12 20:14:08 +00002186
2187 def test_destructor(self):
2188 l = []
2189 base = self.BytesIO
2190 class MyBytesIO(base):
2191 def close(self):
2192 l.append(self.getvalue())
2193 base.close(self)
2194 b = MyBytesIO()
2195 t = self.TextIOWrapper(b, encoding="ascii")
2196 t.write("abc")
2197 del t
2198 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00002199 self.assertEqual([b"abc"], l)
Antoine Pitrou19690592009-06-12 20:14:08 +00002200
2201 def test_override_destructor(self):
2202 record = []
2203 class MyTextIO(self.TextIOWrapper):
2204 def __del__(self):
2205 record.append(1)
2206 try:
2207 f = super(MyTextIO, self).__del__
2208 except AttributeError:
2209 pass
2210 else:
2211 f()
2212 def close(self):
2213 record.append(2)
2214 super(MyTextIO, self).close()
2215 def flush(self):
2216 record.append(3)
2217 super(MyTextIO, self).flush()
2218 b = self.BytesIO()
2219 t = MyTextIO(b, encoding="ascii")
2220 del t
2221 support.gc_collect()
2222 self.assertEqual(record, [1, 2, 3])
2223
2224 def test_error_through_destructor(self):
2225 # Test that the exception state is not modified by a destructor,
2226 # even if close() fails.
2227 rawio = self.CloseFailureIO()
2228 def f():
2229 self.TextIOWrapper(rawio).xyzzy
2230 with support.captured_output("stderr") as s:
2231 self.assertRaises(AttributeError, f)
2232 s = s.getvalue().strip()
2233 if s:
2234 # The destructor *may* have printed an unraisable error, check it
2235 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002236 self.assertTrue(s.startswith("Exception IOError: "), s)
2237 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002238
2239 # Systematic tests of the text I/O API
2240
Antoine Pitrou19690592009-06-12 20:14:08 +00002241 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002242 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2243 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00002244 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002245 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002246 self.assertEqual(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002247 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002248 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002249 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002250 self.assertEqual(f.tell(), 0)
2251 self.assertEqual(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002252 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002253 self.assertEqual(f.seek(0), 0)
2254 self.assertEqual(f.read(None), "abc")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002255 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002256 self.assertEqual(f.read(2), "ab")
2257 self.assertEqual(f.read(1), "c")
2258 self.assertEqual(f.read(1), "")
2259 self.assertEqual(f.read(), "")
2260 self.assertEqual(f.tell(), cookie)
2261 self.assertEqual(f.seek(0), 0)
2262 self.assertEqual(f.seek(0, 2), cookie)
2263 self.assertEqual(f.write("def"), 3)
2264 self.assertEqual(f.seek(cookie), cookie)
2265 self.assertEqual(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002266 if enc.startswith("utf"):
2267 self.multi_line_test(f, enc)
2268 f.close()
2269
2270 def multi_line_test(self, f, enc):
2271 f.seek(0)
2272 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00002273 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002274 wlines = []
2275 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2276 chars = []
2277 for i in range(size):
2278 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00002279 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002280 wlines.append((f.tell(), line))
2281 f.write(line)
2282 f.seek(0)
2283 rlines = []
2284 while True:
2285 pos = f.tell()
2286 line = f.readline()
2287 if not line:
2288 break
2289 rlines.append((pos, line))
Ezio Melotti2623a372010-11-21 13:34:58 +00002290 self.assertEqual(rlines, wlines)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002291
Antoine Pitrou19690592009-06-12 20:14:08 +00002292 def test_telling(self):
2293 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002294 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002295 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002296 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002297 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002298 p2 = f.tell()
2299 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002300 self.assertEqual(f.tell(), p0)
2301 self.assertEqual(f.readline(), "\xff\n")
2302 self.assertEqual(f.tell(), p1)
2303 self.assertEqual(f.readline(), "\xff\n")
2304 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002305 f.seek(0)
2306 for line in f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002307 self.assertEqual(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002308 self.assertRaises(IOError, f.tell)
Ezio Melotti2623a372010-11-21 13:34:58 +00002309 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002310 f.close()
2311
Antoine Pitrou19690592009-06-12 20:14:08 +00002312 def test_seeking(self):
2313 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00002314 prefix_size = chunk_size - 2
2315 u_prefix = "a" * prefix_size
2316 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti2623a372010-11-21 13:34:58 +00002317 self.assertEqual(len(u_prefix), len(prefix))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002318 u_suffix = "\u8888\n"
2319 suffix = bytes(u_suffix.encode("utf-8"))
2320 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00002321 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002322 f.write(line*2)
2323 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002324 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002325 s = f.read(prefix_size)
Ezio Melotti2623a372010-11-21 13:34:58 +00002326 self.assertEqual(s, prefix.decode("ascii"))
2327 self.assertEqual(f.tell(), prefix_size)
2328 self.assertEqual(f.readline(), u_suffix)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002329
Antoine Pitrou19690592009-06-12 20:14:08 +00002330 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002331 # Regression test for a specific bug
2332 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00002333 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002334 f.write(data)
2335 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002336 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002337 f._CHUNK_SIZE # Just test that it exists
2338 f._CHUNK_SIZE = 2
2339 f.readline()
2340 f.tell()
2341
Antoine Pitrou19690592009-06-12 20:14:08 +00002342 def test_seek_and_tell(self):
2343 #Test seek/tell using the StatefulIncrementalDecoder.
2344 # Make test faster by doing smaller seeks
2345 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00002346
Antoine Pitrou19690592009-06-12 20:14:08 +00002347 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002348 """Tell/seek to various points within a data stream and ensure
2349 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00002350 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002351 f.write(data)
2352 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002353 f = self.open(support.TESTFN, encoding='test_decoder')
2354 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00002355 decoded = f.read()
2356 f.close()
2357
2358 for i in range(min_pos, len(decoded) + 1): # seek positions
2359 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00002360 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti2623a372010-11-21 13:34:58 +00002361 self.assertEqual(f.read(i), decoded[:i])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002362 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002363 self.assertEqual(f.read(j), decoded[i:i + j])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002364 f.seek(cookie)
Ezio Melotti2623a372010-11-21 13:34:58 +00002365 self.assertEqual(f.read(), decoded[i:])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002366 f.close()
2367
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002368 # Enable the test decoder.
2369 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002370
2371 # Run the tests.
2372 try:
2373 # Try each test case.
2374 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002375 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002376
2377 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002378 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2379 offset = CHUNK_SIZE - len(input)//2
2380 prefix = b'.'*offset
2381 # Don't bother seeking into the prefix (takes too long).
2382 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002383 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002384
2385 # Ensure our test decoder won't interfere with subsequent tests.
2386 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002387 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002388
Antoine Pitrou19690592009-06-12 20:14:08 +00002389 def test_encoded_writes(self):
2390 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002391 tests = ("utf-16",
2392 "utf-16-le",
2393 "utf-16-be",
2394 "utf-32",
2395 "utf-32-le",
2396 "utf-32-be")
2397 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002398 buf = self.BytesIO()
2399 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002400 # Check if the BOM is written only once (see issue1753).
2401 f.write(data)
2402 f.write(data)
2403 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002404 self.assertEqual(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002405 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002406 self.assertEqual(f.read(), data * 2)
2407 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002408
Antoine Pitrou19690592009-06-12 20:14:08 +00002409 def test_unreadable(self):
2410 class UnReadable(self.BytesIO):
2411 def readable(self):
2412 return False
2413 txt = self.TextIOWrapper(UnReadable())
2414 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002415
Antoine Pitrou19690592009-06-12 20:14:08 +00002416 def test_read_one_by_one(self):
2417 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002418 reads = ""
2419 while True:
2420 c = txt.read(1)
2421 if not c:
2422 break
2423 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002424 self.assertEqual(reads, "AA\nBB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002425
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002426 def test_readlines(self):
2427 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2428 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2429 txt.seek(0)
2430 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2431 txt.seek(0)
2432 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2433
Christian Heimes1a6387e2008-03-26 12:49:49 +00002434 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002435 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002436 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002437 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002438 reads = ""
2439 while True:
2440 c = txt.read(128)
2441 if not c:
2442 break
2443 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002444 self.assertEqual(reads, "A"*127+"\nB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002445
Antoine Pitroueadca1d2012-10-16 23:02:27 +02002446 def test_writelines(self):
2447 l = ['ab', 'cd', 'ef']
2448 buf = self.BytesIO()
2449 txt = self.TextIOWrapper(buf)
2450 txt.writelines(l)
2451 txt.flush()
2452 self.assertEqual(buf.getvalue(), b'abcdef')
2453
2454 def test_writelines_userlist(self):
2455 l = UserList(['ab', 'cd', 'ef'])
2456 buf = self.BytesIO()
2457 txt = self.TextIOWrapper(buf)
2458 txt.writelines(l)
2459 txt.flush()
2460 self.assertEqual(buf.getvalue(), b'abcdef')
2461
2462 def test_writelines_error(self):
2463 txt = self.TextIOWrapper(self.BytesIO())
2464 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2465 self.assertRaises(TypeError, txt.writelines, None)
2466 self.assertRaises(TypeError, txt.writelines, b'abc')
2467
Christian Heimes1a6387e2008-03-26 12:49:49 +00002468 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002469 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002470
2471 # read one char at a time
2472 reads = ""
2473 while True:
2474 c = txt.read(1)
2475 if not c:
2476 break
2477 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002478 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002479
2480 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002481 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002482 txt._CHUNK_SIZE = 4
2483
2484 reads = ""
2485 while True:
2486 c = txt.read(4)
2487 if not c:
2488 break
2489 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002490 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002491
2492 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002493 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002494 txt._CHUNK_SIZE = 4
2495
2496 reads = txt.read(4)
2497 reads += txt.read(4)
2498 reads += txt.readline()
2499 reads += txt.readline()
2500 reads += txt.readline()
Ezio Melotti2623a372010-11-21 13:34:58 +00002501 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002502
2503 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002504 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002505 txt._CHUNK_SIZE = 4
2506
2507 reads = txt.read(4)
2508 reads += txt.read()
Ezio Melotti2623a372010-11-21 13:34:58 +00002509 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002510
2511 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002512 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002513 txt._CHUNK_SIZE = 4
2514
2515 reads = txt.read(4)
2516 pos = txt.tell()
2517 txt.seek(0)
2518 txt.seek(pos)
Ezio Melotti2623a372010-11-21 13:34:58 +00002519 self.assertEqual(txt.read(4), "BBB\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002520
2521 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002522 buffer = self.BytesIO(self.testdata)
2523 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002524
2525 self.assertEqual(buffer.seekable(), txt.seekable())
2526
Antoine Pitrou19690592009-06-12 20:14:08 +00002527 def test_append_bom(self):
2528 # The BOM is not written again when appending to a non-empty file
2529 filename = support.TESTFN
2530 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2531 with self.open(filename, 'w', encoding=charset) as f:
2532 f.write('aaa')
2533 pos = f.tell()
2534 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002535 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002536
2537 with self.open(filename, 'a', encoding=charset) as f:
2538 f.write('xxx')
2539 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002540 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002541
Antoine Pitrou19690592009-06-12 20:14:08 +00002542 def test_seek_bom(self):
2543 # Same test, but when seeking manually
2544 filename = support.TESTFN
2545 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2546 with self.open(filename, 'w', encoding=charset) as f:
2547 f.write('aaa')
2548 pos = f.tell()
2549 with self.open(filename, 'r+', encoding=charset) as f:
2550 f.seek(pos)
2551 f.write('zzz')
2552 f.seek(0)
2553 f.write('bbb')
2554 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002555 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002556
2557 def test_errors_property(self):
2558 with self.open(support.TESTFN, "w") as f:
2559 self.assertEqual(f.errors, "strict")
2560 with self.open(support.TESTFN, "w", errors="replace") as f:
2561 self.assertEqual(f.errors, "replace")
2562
Victor Stinner6a102812010-04-27 23:55:59 +00002563 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002564 def test_threads_write(self):
2565 # Issue6750: concurrent writes could duplicate data
2566 event = threading.Event()
2567 with self.open(support.TESTFN, "w", buffering=1) as f:
2568 def run(n):
2569 text = "Thread%03d\n" % n
2570 event.wait()
2571 f.write(text)
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03002572 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002573 for x in range(20)]
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03002574 with support.start_threads(threads, event.set):
2575 time.sleep(0.02)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002576 with self.open(support.TESTFN) as f:
2577 content = f.read()
2578 for n in range(20):
Ezio Melotti2623a372010-11-21 13:34:58 +00002579 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002580
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002581 def test_flush_error_on_close(self):
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002582 # Test that text file is closed despite failed flush
2583 # and that flush() is called before file closed.
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002584 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002585 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002586 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002587 closed[:] = [txt.closed, txt.buffer.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002588 raise IOError()
2589 txt.flush = bad_flush
2590 self.assertRaises(IOError, txt.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -06002591 self.assertTrue(txt.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002592 self.assertTrue(txt.buffer.closed)
2593 self.assertTrue(closed) # flush() called
2594 self.assertFalse(closed[0]) # flush() called before file closed
2595 self.assertFalse(closed[1])
Serhiy Storchaka437d5352015-02-23 00:28:38 +02002596 txt.flush = lambda: None # break reference loop
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002597
2598 def test_multi_close(self):
2599 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2600 txt.close()
2601 txt.close()
2602 txt.close()
2603 self.assertRaises(ValueError, txt.flush)
2604
Antoine Pitroufc9ead62010-12-21 21:26:55 +00002605 def test_readonly_attributes(self):
2606 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2607 buf = self.BytesIO(self.testdata)
2608 with self.assertRaises((AttributeError, TypeError)):
2609 txt.buffer = buf
2610
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002611 def test_read_nonbytes(self):
2612 # Issue #17106
2613 # Crash when underlying read() returns non-bytes
2614 class NonbytesStream(self.StringIO):
2615 read1 = self.StringIO.read
2616 class NonbytesStream(self.StringIO):
2617 read1 = self.StringIO.read
2618 t = self.TextIOWrapper(NonbytesStream('a'))
2619 with self.maybeRaises(TypeError):
2620 t.read(1)
2621 t = self.TextIOWrapper(NonbytesStream('a'))
2622 with self.maybeRaises(TypeError):
2623 t.readline()
2624 t = self.TextIOWrapper(NonbytesStream('a'))
2625 self.assertEqual(t.read(), u'a')
2626
2627 def test_illegal_decoder(self):
2628 # Issue #17106
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002629 # Bypass the early encoding check added in issue 20404
2630 def _make_illegal_wrapper():
2631 quopri = codecs.lookup("quopri_codec")
2632 quopri._is_text_encoding = True
2633 try:
2634 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
2635 newline='\n', encoding="quopri_codec")
2636 finally:
2637 quopri._is_text_encoding = False
2638 return t
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002639 # Crash when decoder returns non-string
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002640 with support.check_py3k_warnings():
2641 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2642 encoding='quopri_codec')
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002643 with self.maybeRaises(TypeError):
2644 t.read(1)
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002645 with support.check_py3k_warnings():
2646 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2647 encoding='quopri_codec')
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002648 with self.maybeRaises(TypeError):
2649 t.readline()
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002650 with support.check_py3k_warnings():
2651 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2652 encoding='quopri_codec')
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002653 with self.maybeRaises(TypeError):
2654 t.read()
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03002655 #else:
2656 #t = _make_illegal_wrapper()
2657 #self.assertRaises(TypeError, t.read, 1)
2658 #t = _make_illegal_wrapper()
2659 #self.assertRaises(TypeError, t.readline)
2660 #t = _make_illegal_wrapper()
2661 #self.assertRaises(TypeError, t.read)
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002662
2663
Antoine Pitrou19690592009-06-12 20:14:08 +00002664class CTextIOWrapperTest(TextIOWrapperTest):
2665
2666 def test_initialization(self):
2667 r = self.BytesIO(b"\xc3\xa9\n\n")
2668 b = self.BufferedReader(r, 1000)
2669 t = self.TextIOWrapper(b)
2670 self.assertRaises(TypeError, t.__init__, b, newline=42)
2671 self.assertRaises(ValueError, t.read)
2672 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2673 self.assertRaises(ValueError, t.read)
2674
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002675 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2676 self.assertRaises(Exception, repr, t)
2677
Antoine Pitrou19690592009-06-12 20:14:08 +00002678 def test_garbage_collection(self):
2679 # C TextIOWrapper objects are collected, and collecting them flushes
2680 # all data to disk.
2681 # The Python version has __del__, so it ends in gc.garbage instead.
2682 rawio = io.FileIO(support.TESTFN, "wb")
2683 b = self.BufferedWriter(rawio)
2684 t = self.TextIOWrapper(b, encoding="ascii")
2685 t.write("456def")
2686 t.x = t
2687 wr = weakref.ref(t)
2688 del t
2689 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03002690 self.assertIsNone(wr(), wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002691 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002692 self.assertEqual(f.read(), b"456def")
2693
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002694 def test_rwpair_cleared_before_textio(self):
2695 # Issue 13070: TextIOWrapper's finalization would crash when called
2696 # after the reference to the underlying BufferedRWPair's writer got
2697 # cleared by the GC.
2698 for i in range(1000):
2699 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2700 t1 = self.TextIOWrapper(b1, encoding="ascii")
2701 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2702 t2 = self.TextIOWrapper(b2, encoding="ascii")
2703 # circular references
2704 t1.buddy = t2
2705 t2.buddy = t1
2706 support.gc_collect()
2707
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002708 maybeRaises = unittest.TestCase.assertRaises
2709
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002710
Antoine Pitrou19690592009-06-12 20:14:08 +00002711class PyTextIOWrapperTest(TextIOWrapperTest):
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002712 @contextlib.contextmanager
2713 def maybeRaises(self, *args, **kwds):
2714 yield
Antoine Pitrou19690592009-06-12 20:14:08 +00002715
2716
2717class IncrementalNewlineDecoderTest(unittest.TestCase):
2718
2719 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002720 # UTF-8 specific tests for a newline decoder
2721 def _check_decode(b, s, **kwargs):
2722 # We exercise getstate() / setstate() as well as decode()
2723 state = decoder.getstate()
Ezio Melotti2623a372010-11-21 13:34:58 +00002724 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002725 decoder.setstate(state)
Ezio Melotti2623a372010-11-21 13:34:58 +00002726 self.assertEqual(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002727
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002728 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002729
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002730 _check_decode(b'\xe8', "")
2731 _check_decode(b'\xa2', "")
2732 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002733
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002734 _check_decode(b'\xe8', "")
2735 _check_decode(b'\xa2', "")
2736 _check_decode(b'\x88', "\u8888")
2737
2738 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002739 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2740
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002741 decoder.reset()
2742 _check_decode(b'\n', "\n")
2743 _check_decode(b'\r', "")
2744 _check_decode(b'', "\n", final=True)
2745 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002746
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002747 _check_decode(b'\r', "")
2748 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002749
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002750 _check_decode(b'\r\r\n', "\n\n")
2751 _check_decode(b'\r', "")
2752 _check_decode(b'\r', "\n")
2753 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002754
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002755 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2756 _check_decode(b'\xe8\xa2\x88', "\u8888")
2757 _check_decode(b'\n', "\n")
2758 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2759 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002760
Antoine Pitrou19690592009-06-12 20:14:08 +00002761 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002762 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002763 if encoding is not None:
2764 encoder = codecs.getincrementalencoder(encoding)()
2765 def _decode_bytewise(s):
2766 # Decode one byte at a time
2767 for b in encoder.encode(s):
2768 result.append(decoder.decode(b))
2769 else:
2770 encoder = None
2771 def _decode_bytewise(s):
2772 # Decode one char at a time
2773 for c in s:
2774 result.append(decoder.decode(c))
Ezio Melotti2623a372010-11-21 13:34:58 +00002775 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002776 _decode_bytewise("abc\n\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002777 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002778 _decode_bytewise("\nabc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002779 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002780 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002781 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002782 _decode_bytewise("abc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002783 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002784 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002785 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002786 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002787 input = "abc"
2788 if encoder is not None:
2789 encoder.reset()
2790 input = encoder.encode(input)
Ezio Melotti2623a372010-11-21 13:34:58 +00002791 self.assertEqual(decoder.decode(input), "abc")
2792 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002793
2794 def test_newline_decoder(self):
2795 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002796 # None meaning the IncrementalNewlineDecoder takes unicode input
2797 # rather than bytes input
2798 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002799 'utf-16', 'utf-16-le', 'utf-16-be',
2800 'utf-32', 'utf-32-le', 'utf-32-be',
2801 )
2802 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002803 decoder = enc and codecs.getincrementaldecoder(enc)()
2804 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2805 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002806 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002807 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2808 self.check_newline_decoding_utf8(decoder)
2809
2810 def test_newline_bytes(self):
2811 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2812 def _check(dec):
Ezio Melotti2623a372010-11-21 13:34:58 +00002813 self.assertEqual(dec.newlines, None)
2814 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2815 self.assertEqual(dec.newlines, None)
2816 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2817 self.assertEqual(dec.newlines, None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002818 dec = self.IncrementalNewlineDecoder(None, translate=False)
2819 _check(dec)
2820 dec = self.IncrementalNewlineDecoder(None, translate=True)
2821 _check(dec)
2822
2823class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2824 pass
2825
2826class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2827 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002828
Christian Heimes1a6387e2008-03-26 12:49:49 +00002829
2830# XXX Tests for open()
2831
2832class MiscIOTest(unittest.TestCase):
2833
Benjamin Petersonad100c32008-11-20 22:06:22 +00002834 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002835 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002836
Antoine Pitrou19690592009-06-12 20:14:08 +00002837 def test___all__(self):
2838 for name in self.io.__all__:
2839 obj = getattr(self.io, name, None)
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03002840 self.assertIsNotNone(obj, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002841 if name == "open":
2842 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002843 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002844 self.assertTrue(issubclass(obj, Exception), name)
2845 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002846 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002847
Benjamin Petersonad100c32008-11-20 22:06:22 +00002848 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002849 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002850 self.assertEqual(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002851 f.close()
2852
Antoine Pitrou19690592009-06-12 20:14:08 +00002853 f = self.open(support.TESTFN, "U")
Ezio Melotti2623a372010-11-21 13:34:58 +00002854 self.assertEqual(f.name, support.TESTFN)
2855 self.assertEqual(f.buffer.name, support.TESTFN)
2856 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2857 self.assertEqual(f.mode, "U")
2858 self.assertEqual(f.buffer.mode, "rb")
2859 self.assertEqual(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002860 f.close()
2861
Antoine Pitrou19690592009-06-12 20:14:08 +00002862 f = self.open(support.TESTFN, "w+")
Ezio Melotti2623a372010-11-21 13:34:58 +00002863 self.assertEqual(f.mode, "w+")
2864 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2865 self.assertEqual(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002866
Antoine Pitrou19690592009-06-12 20:14:08 +00002867 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti2623a372010-11-21 13:34:58 +00002868 self.assertEqual(g.mode, "wb")
2869 self.assertEqual(g.raw.mode, "wb")
2870 self.assertEqual(g.name, f.fileno())
2871 self.assertEqual(g.raw.name, f.fileno())
Benjamin Petersonad100c32008-11-20 22:06:22 +00002872 f.close()
2873 g.close()
2874
Antoine Pitrou19690592009-06-12 20:14:08 +00002875 def test_io_after_close(self):
2876 for kwargs in [
2877 {"mode": "w"},
2878 {"mode": "wb"},
2879 {"mode": "w", "buffering": 1},
2880 {"mode": "w", "buffering": 2},
2881 {"mode": "wb", "buffering": 0},
2882 {"mode": "r"},
2883 {"mode": "rb"},
2884 {"mode": "r", "buffering": 1},
2885 {"mode": "r", "buffering": 2},
2886 {"mode": "rb", "buffering": 0},
2887 {"mode": "w+"},
2888 {"mode": "w+b"},
2889 {"mode": "w+", "buffering": 1},
2890 {"mode": "w+", "buffering": 2},
2891 {"mode": "w+b", "buffering": 0},
2892 ]:
2893 f = self.open(support.TESTFN, **kwargs)
2894 f.close()
2895 self.assertRaises(ValueError, f.flush)
2896 self.assertRaises(ValueError, f.fileno)
2897 self.assertRaises(ValueError, f.isatty)
2898 self.assertRaises(ValueError, f.__iter__)
2899 if hasattr(f, "peek"):
2900 self.assertRaises(ValueError, f.peek, 1)
2901 self.assertRaises(ValueError, f.read)
2902 if hasattr(f, "read1"):
2903 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinner5100a402011-05-25 22:15:36 +02002904 if hasattr(f, "readall"):
2905 self.assertRaises(ValueError, f.readall)
Antoine Pitrou19690592009-06-12 20:14:08 +00002906 if hasattr(f, "readinto"):
2907 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2908 self.assertRaises(ValueError, f.readline)
2909 self.assertRaises(ValueError, f.readlines)
2910 self.assertRaises(ValueError, f.seek, 0)
2911 self.assertRaises(ValueError, f.tell)
2912 self.assertRaises(ValueError, f.truncate)
2913 self.assertRaises(ValueError, f.write,
2914 b"" if "b" in kwargs['mode'] else "")
2915 self.assertRaises(ValueError, f.writelines, [])
2916 self.assertRaises(ValueError, next, f)
2917
2918 def test_blockingioerror(self):
2919 # Various BlockingIOError issues
2920 self.assertRaises(TypeError, self.BlockingIOError)
2921 self.assertRaises(TypeError, self.BlockingIOError, 1)
2922 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2923 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2924 b = self.BlockingIOError(1, "")
2925 self.assertEqual(b.characters_written, 0)
2926 class C(unicode):
2927 pass
2928 c = C("")
2929 b = self.BlockingIOError(1, c)
2930 c.b = b
2931 b.c = c
2932 wr = weakref.ref(c)
2933 del c, b
2934 support.gc_collect()
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03002935 self.assertIsNone(wr(), wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002936
2937 def test_abcs(self):
2938 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002939 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2940 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2941 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2942 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002943
2944 def _check_abc_inheritance(self, abcmodule):
2945 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002946 self.assertIsInstance(f, abcmodule.IOBase)
2947 self.assertIsInstance(f, abcmodule.RawIOBase)
2948 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2949 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002950 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002951 self.assertIsInstance(f, abcmodule.IOBase)
2952 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2953 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2954 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002955 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002956 self.assertIsInstance(f, abcmodule.IOBase)
2957 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2958 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2959 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002960
2961 def test_abc_inheritance(self):
2962 # Test implementations inherit from their respective ABCs
2963 self._check_abc_inheritance(self)
2964
2965 def test_abc_inheritance_official(self):
2966 # Test implementations inherit from the official ABCs of the
2967 # baseline "io" module.
2968 self._check_abc_inheritance(io)
2969
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01002970 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2971 def test_nonblock_pipe_write_bigbuf(self):
2972 self._test_nonblock_pipe_write(16*1024)
2973
2974 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2975 def test_nonblock_pipe_write_smallbuf(self):
2976 self._test_nonblock_pipe_write(1024)
2977
2978 def _set_non_blocking(self, fd):
2979 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2980 self.assertNotEqual(flags, -1)
2981 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2982 self.assertEqual(res, 0)
2983
2984 def _test_nonblock_pipe_write(self, bufsize):
2985 sent = []
2986 received = []
2987 r, w = os.pipe()
2988 self._set_non_blocking(r)
2989 self._set_non_blocking(w)
2990
2991 # To exercise all code paths in the C implementation we need
2992 # to play with buffer sizes. For instance, if we choose a
2993 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2994 # then we will never get a partial write of the buffer.
2995 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2996 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2997
2998 with rf, wf:
2999 for N in 9999, 73, 7574:
3000 try:
3001 i = 0
3002 while True:
3003 msg = bytes([i % 26 + 97]) * N
3004 sent.append(msg)
3005 wf.write(msg)
3006 i += 1
3007
3008 except self.BlockingIOError as e:
3009 self.assertEqual(e.args[0], errno.EAGAIN)
3010 sent[-1] = sent[-1][:e.characters_written]
3011 received.append(rf.read())
3012 msg = b'BLOCKED'
3013 wf.write(msg)
3014 sent.append(msg)
3015
3016 while True:
3017 try:
3018 wf.flush()
3019 break
3020 except self.BlockingIOError as e:
3021 self.assertEqual(e.args[0], errno.EAGAIN)
3022 self.assertEqual(e.characters_written, 0)
3023 received.append(rf.read())
3024
3025 received += iter(rf.read, None)
3026
3027 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchakaea4d2872015-08-02 15:19:04 +03003028 self.assertEqual(sent, received)
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01003029 self.assertTrue(wf.closed)
3030 self.assertTrue(rf.closed)
3031
Antoine Pitrou19690592009-06-12 20:14:08 +00003032class CMiscIOTest(MiscIOTest):
3033 io = io
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03003034 shutdown_error = "RuntimeError: could not find io module state"
Antoine Pitrou19690592009-06-12 20:14:08 +00003035
3036class PyMiscIOTest(MiscIOTest):
3037 io = pyio
Serhiy Storchakac7797dc2015-05-31 20:21:00 +03003038 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Petersonad100c32008-11-20 22:06:22 +00003039
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003040
3041@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3042class SignalsTest(unittest.TestCase):
3043
3044 def setUp(self):
3045 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3046
3047 def tearDown(self):
3048 signal.signal(signal.SIGALRM, self.oldalrm)
3049
3050 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00003051 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003052
3053 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinner49d495f2011-07-04 11:44:46 +02003054 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
3055 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003056 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3057 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou6439c002011-02-25 21:35:47 +00003058 invokes the signal handler, and bubbles up the exception raised
3059 in the latter."""
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003060 read_results = []
3061 def _read():
3062 s = os.read(r, 1)
3063 read_results.append(s)
3064 t = threading.Thread(target=_read)
3065 t.daemon = True
3066 r, w = os.pipe()
3067 try:
3068 wio = self.io.open(w, **fdopen_kwargs)
3069 t.start()
3070 signal.alarm(1)
3071 # Fill the pipe enough that the write will be blocking.
3072 # It will be interrupted by the timer armed above. Since the
3073 # other thread has read one byte, the low-level write will
3074 # return with a successful (partial) result rather than an EINTR.
3075 # The buffered IO layer must check for pending signal
3076 # handlers, which in this case will invoke alarm_interrupt().
Serhiy Storchakabd8c6292015-04-01 12:56:39 +03003077 try:
3078 with self.assertRaises(ZeroDivisionError):
3079 wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1))
3080 finally:
3081 t.join()
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003082 # We got one byte, get another one and check that it isn't a
3083 # repeat of the first one.
3084 read_results.append(os.read(r, 1))
3085 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3086 finally:
3087 os.close(w)
3088 os.close(r)
3089 # This is deliberate. If we didn't close the file descriptor
3090 # before closing wio, wio would try to flush its internal
3091 # buffer, and block again.
3092 try:
3093 wio.close()
3094 except IOError as e:
3095 if e.errno != errno.EBADF:
3096 raise
3097
3098 def test_interrupted_write_unbuffered(self):
3099 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3100
3101 def test_interrupted_write_buffered(self):
3102 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3103
3104 def test_interrupted_write_text(self):
3105 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3106
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003107 def check_reentrant_write(self, data, **fdopen_kwargs):
3108 def on_alarm(*args):
3109 # Will be called reentrantly from the same thread
3110 wio.write(data)
Victor Stinner4c41f842011-07-05 13:29:26 +02003111 1//0
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003112 signal.signal(signal.SIGALRM, on_alarm)
3113 r, w = os.pipe()
3114 wio = self.io.open(w, **fdopen_kwargs)
3115 try:
3116 signal.alarm(1)
3117 # Either the reentrant call to wio.write() fails with RuntimeError,
3118 # or the signal handler raises ZeroDivisionError.
3119 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3120 while 1:
3121 for i in range(100):
3122 wio.write(data)
3123 wio.flush()
3124 # Make sure the buffer doesn't fill up and block further writes
3125 os.read(r, len(data) * 100)
3126 exc = cm.exception
3127 if isinstance(exc, RuntimeError):
3128 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3129 finally:
3130 wio.close()
3131 os.close(r)
3132
3133 def test_reentrant_write_buffered(self):
3134 self.check_reentrant_write(b"xy", mode="wb")
3135
3136 def test_reentrant_write_text(self):
3137 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3138
Antoine Pitrou6439c002011-02-25 21:35:47 +00003139 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3140 """Check that a buffered read, when it gets interrupted (either
3141 returning a partial result or EINTR), properly invokes the signal
3142 handler and retries if the latter returned successfully."""
3143 r, w = os.pipe()
3144 fdopen_kwargs["closefd"] = False
3145 def alarm_handler(sig, frame):
3146 os.write(w, b"bar")
3147 signal.signal(signal.SIGALRM, alarm_handler)
3148 try:
3149 rio = self.io.open(r, **fdopen_kwargs)
3150 os.write(w, b"foo")
3151 signal.alarm(1)
3152 # Expected behaviour:
3153 # - first raw read() returns partial b"foo"
3154 # - second raw read() returns EINTR
3155 # - third raw read() returns b"bar"
3156 self.assertEqual(decode(rio.read(6)), "foobar")
3157 finally:
3158 rio.close()
3159 os.close(w)
3160 os.close(r)
3161
3162 def test_interrupterd_read_retry_buffered(self):
3163 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3164 mode="rb")
3165
3166 def test_interrupterd_read_retry_text(self):
3167 self.check_interrupted_read_retry(lambda x: x,
3168 mode="r")
3169
3170 @unittest.skipUnless(threading, 'Threading required for this test.')
3171 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3172 """Check that a buffered write, when it gets interrupted (either
3173 returning a partial result or EINTR), properly invokes the signal
3174 handler and retries if the latter returned successfully."""
3175 select = support.import_module("select")
3176 # A quantity that exceeds the buffer size of an anonymous pipe's
3177 # write end.
Antoine Pitrou68915d72013-04-24 23:31:38 +02003178 N = support.PIPE_MAX_SIZE
Antoine Pitrou6439c002011-02-25 21:35:47 +00003179 r, w = os.pipe()
3180 fdopen_kwargs["closefd"] = False
3181 # We need a separate thread to read from the pipe and allow the
3182 # write() to finish. This thread is started after the SIGALRM is
3183 # received (forcing a first EINTR in write()).
3184 read_results = []
3185 write_finished = False
Serhiy Storchaka53ea1622015-03-28 20:38:48 +02003186 error = [None]
Antoine Pitrou6439c002011-02-25 21:35:47 +00003187 def _read():
Serhiy Storchaka53ea1622015-03-28 20:38:48 +02003188 try:
3189 while not write_finished:
3190 while r in select.select([r], [], [], 1.0)[0]:
3191 s = os.read(r, 1024)
3192 read_results.append(s)
3193 except BaseException as exc:
3194 error[0] = exc
Antoine Pitrou6439c002011-02-25 21:35:47 +00003195 t = threading.Thread(target=_read)
3196 t.daemon = True
3197 def alarm1(sig, frame):
3198 signal.signal(signal.SIGALRM, alarm2)
3199 signal.alarm(1)
3200 def alarm2(sig, frame):
3201 t.start()
3202 signal.signal(signal.SIGALRM, alarm1)
3203 try:
3204 wio = self.io.open(w, **fdopen_kwargs)
3205 signal.alarm(1)
3206 # Expected behaviour:
3207 # - first raw write() is partial (because of the limited pipe buffer
3208 # and the first alarm)
3209 # - second raw write() returns EINTR (because of the second alarm)
3210 # - subsequent write()s are successful (either partial or complete)
3211 self.assertEqual(N, wio.write(item * N))
3212 wio.flush()
3213 write_finished = True
3214 t.join()
Serhiy Storchaka53ea1622015-03-28 20:38:48 +02003215
3216 self.assertIsNone(error[0])
Antoine Pitrou6439c002011-02-25 21:35:47 +00003217 self.assertEqual(N, sum(len(x) for x in read_results))
3218 finally:
3219 write_finished = True
3220 os.close(w)
3221 os.close(r)
3222 # This is deliberate. If we didn't close the file descriptor
3223 # before closing wio, wio would try to flush its internal
3224 # buffer, and could block (in case of failure).
3225 try:
3226 wio.close()
3227 except IOError as e:
3228 if e.errno != errno.EBADF:
3229 raise
3230
3231 def test_interrupterd_write_retry_buffered(self):
3232 self.check_interrupted_write_retry(b"x", mode="wb")
3233
3234 def test_interrupterd_write_retry_text(self):
3235 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3236
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003237
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003238class CSignalsTest(SignalsTest):
3239 io = io
3240
3241class PySignalsTest(SignalsTest):
3242 io = pyio
3243
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003244 # Handling reentrancy issues would slow down _pyio even more, so the
3245 # tests are disabled.
3246 test_reentrant_write_buffered = None
3247 test_reentrant_write_text = None
3248
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003249
Christian Heimes1a6387e2008-03-26 12:49:49 +00003250def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00003251 tests = (CIOTest, PyIOTest,
3252 CBufferedReaderTest, PyBufferedReaderTest,
3253 CBufferedWriterTest, PyBufferedWriterTest,
3254 CBufferedRWPairTest, PyBufferedRWPairTest,
3255 CBufferedRandomTest, PyBufferedRandomTest,
3256 StatefulIncrementalDecoderTest,
3257 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3258 CTextIOWrapperTest, PyTextIOWrapperTest,
3259 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003260 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00003261 )
3262
3263 # Put the namespaces of the IO module we are testing and some useful mock
3264 # classes in the __dict__ of each test.
3265 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00003266 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00003267 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3268 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
3269 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
3270 globs = globals()
3271 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3272 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3273 # Avoid turning open into a bound method.
3274 py_io_ns["open"] = pyio.OpenWrapper
3275 for test in tests:
3276 if test.__name__.startswith("C"):
3277 for name, obj in c_io_ns.items():
3278 setattr(test, name, obj)
3279 elif test.__name__.startswith("Py"):
3280 for name, obj in py_io_ns.items():
3281 setattr(test, name, obj)
3282
3283 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00003284
3285if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00003286 test_main()