blob: 3769a028f819fc7df232c059d1be2e0ddc5a4635 [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Antoine Pitrou19690592009-06-12 20:14:08 +000032import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000033import signal
34import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000035from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000036from collections import deque
Antoine Pitrou78e761e2012-10-16 22:57:11 +020037from UserList import UserList
Antoine Pitrou19690592009-06-12 20:14:08 +000038from test import test_support as support
Serhiy Storchaka354d50e2013-02-03 17:10:42 +020039import contextlib
Christian Heimes1a6387e2008-03-26 12:49:49 +000040
41import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000042import io # C implementation of io
43import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000044try:
45 import threading
46except ImportError:
47 threading = None
Antoine Pitrou5aa7df32011-11-21 20:16:44 +010048try:
49 import fcntl
50except ImportError:
51 fcntl = None
Antoine Pitrou19690592009-06-12 20:14:08 +000052
53__metaclass__ = type
54bytes = support.py3k_bytes
55
56def _default_chunk_size():
57 """Get the default TextIOWrapper chunk size"""
58 with io.open(__file__, "r", encoding="latin1") as f:
59 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000060
61
Antoine Pitrou6391b342010-09-14 18:48:19 +000062class MockRawIOWithoutRead:
63 """A RawIO implementation without read(), so as to exercise the default
64 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000065
66 def __init__(self, read_stack=()):
67 self._read_stack = list(read_stack)
68 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000069 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000070 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000071
Christian Heimes1a6387e2008-03-26 12:49:49 +000072 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000073 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000074 return len(b)
75
76 def writable(self):
77 return True
78
79 def fileno(self):
80 return 42
81
82 def readable(self):
83 return True
84
85 def seekable(self):
86 return True
87
88 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000089 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000090
91 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000092 return 0 # same comment as above
93
94 def readinto(self, buf):
95 self._reads += 1
96 max_len = len(buf)
97 try:
98 data = self._read_stack[0]
99 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000100 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +0000101 return 0
102 if data is None:
103 del self._read_stack[0]
104 return None
105 n = len(data)
106 if len(data) <= max_len:
107 del self._read_stack[0]
108 buf[:n] = data
109 return n
110 else:
111 buf[:] = data[:max_len]
112 self._read_stack[0] = data[max_len:]
113 return max_len
114
115 def truncate(self, pos=None):
116 return pos
117
Antoine Pitrou6391b342010-09-14 18:48:19 +0000118class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
119 pass
120
121class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
122 pass
123
124
125class MockRawIO(MockRawIOWithoutRead):
126
127 def read(self, n=None):
128 self._reads += 1
129 try:
130 return self._read_stack.pop(0)
131 except:
132 self._extraneous_reads += 1
133 return b""
134
Antoine Pitrou19690592009-06-12 20:14:08 +0000135class CMockRawIO(MockRawIO, io.RawIOBase):
136 pass
137
138class PyMockRawIO(MockRawIO, pyio.RawIOBase):
139 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000140
141
Antoine Pitrou19690592009-06-12 20:14:08 +0000142class MisbehavedRawIO(MockRawIO):
143 def write(self, b):
144 return MockRawIO.write(self, b) * 2
145
146 def read(self, n=None):
147 return MockRawIO.read(self, n) * 2
148
149 def seek(self, pos, whence):
150 return -123
151
152 def tell(self):
153 return -456
154
155 def readinto(self, buf):
156 MockRawIO.readinto(self, buf)
157 return len(buf) * 5
158
159class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
160 pass
161
162class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
163 pass
164
165
166class CloseFailureIO(MockRawIO):
167 closed = 0
168
169 def close(self):
170 if not self.closed:
171 self.closed = 1
172 raise IOError
173
174class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
175 pass
176
177class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
178 pass
179
180
181class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000182
183 def __init__(self, data):
184 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000185 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000186
187 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000188 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000189 self.read_history.append(None if res is None else len(res))
190 return res
191
Antoine Pitrou19690592009-06-12 20:14:08 +0000192 def readinto(self, b):
193 res = super(MockFileIO, self).readinto(b)
194 self.read_history.append(res)
195 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000196
Antoine Pitrou19690592009-06-12 20:14:08 +0000197class CMockFileIO(MockFileIO, io.BytesIO):
198 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000199
Antoine Pitrou19690592009-06-12 20:14:08 +0000200class PyMockFileIO(MockFileIO, pyio.BytesIO):
201 pass
202
203
204class MockNonBlockWriterIO:
205
206 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000207 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000208 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000209
Antoine Pitrou19690592009-06-12 20:14:08 +0000210 def pop_written(self):
211 s = b"".join(self._write_stack)
212 self._write_stack[:] = []
213 return s
214
215 def block_on(self, char):
216 """Block when a given char is encountered."""
217 self._blocker_char = char
218
219 def readable(self):
220 return True
221
222 def seekable(self):
223 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000224
225 def writable(self):
226 return True
227
Antoine Pitrou19690592009-06-12 20:14:08 +0000228 def write(self, b):
229 b = bytes(b)
230 n = -1
231 if self._blocker_char:
232 try:
233 n = b.index(self._blocker_char)
234 except ValueError:
235 pass
236 else:
Antoine Pitrou5aa7df32011-11-21 20:16:44 +0100237 if n > 0:
238 # write data up to the first blocker
239 self._write_stack.append(b[:n])
240 return n
241 else:
242 # cancel blocker and indicate would block
243 self._blocker_char = None
244 return None
Antoine Pitrou19690592009-06-12 20:14:08 +0000245 self._write_stack.append(b)
246 return len(b)
247
248class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
249 BlockingIOError = io.BlockingIOError
250
251class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
252 BlockingIOError = pyio.BlockingIOError
253
Christian Heimes1a6387e2008-03-26 12:49:49 +0000254
255class IOTest(unittest.TestCase):
256
Antoine Pitrou19690592009-06-12 20:14:08 +0000257 def setUp(self):
258 support.unlink(support.TESTFN)
259
Christian Heimes1a6387e2008-03-26 12:49:49 +0000260 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000261 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000262
263 def write_ops(self, f):
264 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000265 f.truncate(0)
266 self.assertEqual(f.tell(), 5)
267 f.seek(0)
268
269 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000270 self.assertEqual(f.seek(0), 0)
271 self.assertEqual(f.write(b"Hello."), 6)
272 self.assertEqual(f.tell(), 6)
273 self.assertEqual(f.seek(-1, 1), 5)
274 self.assertEqual(f.tell(), 5)
275 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
276 self.assertEqual(f.seek(0), 0)
277 self.assertEqual(f.write(b"h"), 1)
278 self.assertEqual(f.seek(-1, 2), 13)
279 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000280
Christian Heimes1a6387e2008-03-26 12:49:49 +0000281 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000282 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000283 self.assertRaises(TypeError, f.seek, 0.0)
284
285 def read_ops(self, f, buffered=False):
286 data = f.read(5)
287 self.assertEqual(data, b"hello")
288 data = bytearray(data)
289 self.assertEqual(f.readinto(data), 5)
290 self.assertEqual(data, b" worl")
291 self.assertEqual(f.readinto(data), 2)
292 self.assertEqual(len(data), 5)
293 self.assertEqual(data[:2], b"d\n")
294 self.assertEqual(f.seek(0), 0)
295 self.assertEqual(f.read(20), b"hello world\n")
296 self.assertEqual(f.read(1), b"")
297 self.assertEqual(f.readinto(bytearray(b"x")), 0)
298 self.assertEqual(f.seek(-6, 2), 6)
299 self.assertEqual(f.read(5), b"world")
300 self.assertEqual(f.read(0), b"")
301 self.assertEqual(f.readinto(bytearray()), 0)
302 self.assertEqual(f.seek(-6, 1), 5)
303 self.assertEqual(f.read(5), b" worl")
304 self.assertEqual(f.tell(), 10)
305 self.assertRaises(TypeError, f.seek, 0.0)
306 if buffered:
307 f.seek(0)
308 self.assertEqual(f.read(), b"hello world\n")
309 f.seek(6)
310 self.assertEqual(f.read(), b"world\n")
311 self.assertEqual(f.read(), b"")
312
313 LARGE = 2**31
314
315 def large_file_ops(self, f):
316 assert f.readable()
317 assert f.writable()
318 self.assertEqual(f.seek(self.LARGE), self.LARGE)
319 self.assertEqual(f.tell(), self.LARGE)
320 self.assertEqual(f.write(b"xxx"), 3)
321 self.assertEqual(f.tell(), self.LARGE + 3)
322 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
323 self.assertEqual(f.truncate(), self.LARGE + 2)
324 self.assertEqual(f.tell(), self.LARGE + 2)
325 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
326 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000327 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000328 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
329 self.assertEqual(f.seek(-1, 2), self.LARGE)
330 self.assertEqual(f.read(2), b"x")
331
Antoine Pitrou19690592009-06-12 20:14:08 +0000332 def test_invalid_operations(self):
333 # Try writing on a file opened in read mode and vice-versa.
334 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000335 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000336 self.assertRaises(IOError, fp.read)
337 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000338 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000339 self.assertRaises(IOError, fp.write, b"blah")
340 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000341 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000342 self.assertRaises(IOError, fp.write, "blah")
343 self.assertRaises(IOError, fp.writelines, ["blah\n"])
344
Christian Heimes1a6387e2008-03-26 12:49:49 +0000345 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000346 with self.open(support.TESTFN, "wb", buffering=0) as f:
347 self.assertEqual(f.readable(), False)
348 self.assertEqual(f.writable(), True)
349 self.assertEqual(f.seekable(), True)
350 self.write_ops(f)
351 with self.open(support.TESTFN, "rb", buffering=0) as f:
352 self.assertEqual(f.readable(), True)
353 self.assertEqual(f.writable(), False)
354 self.assertEqual(f.seekable(), True)
355 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000356
357 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000358 with self.open(support.TESTFN, "wb") as f:
359 self.assertEqual(f.readable(), False)
360 self.assertEqual(f.writable(), True)
361 self.assertEqual(f.seekable(), True)
362 self.write_ops(f)
363 with self.open(support.TESTFN, "rb") as f:
364 self.assertEqual(f.readable(), True)
365 self.assertEqual(f.writable(), False)
366 self.assertEqual(f.seekable(), True)
367 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000368
369 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000370 with self.open(support.TESTFN, "wb") as f:
371 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
372 with self.open(support.TESTFN, "rb") as f:
373 self.assertEqual(f.readline(), b"abc\n")
374 self.assertEqual(f.readline(10), b"def\n")
375 self.assertEqual(f.readline(2), b"xy")
376 self.assertEqual(f.readline(4), b"zzy\n")
377 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000378 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000379 self.assertRaises(TypeError, f.readline, 5.3)
380 with self.open(support.TESTFN, "r") as f:
381 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000382
383 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000384 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000385 self.write_ops(f)
386 data = f.getvalue()
387 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000388 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000389 self.read_ops(f, True)
390
391 def test_large_file_ops(self):
392 # On Windows and Mac OSX this test comsumes large resources; It takes
393 # a long time to build the >2GB file and takes >2GB of disk space
394 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000395 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
396 if not support.is_resource_enabled("largefile"):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000397 print("\nTesting large file ops skipped on %s." % sys.platform,
398 file=sys.stderr)
399 print("It requires %d bytes and a long time." % self.LARGE,
400 file=sys.stderr)
401 print("Use 'regrtest.py -u largefile test_io' to run it.",
402 file=sys.stderr)
403 return
Antoine Pitrou19690592009-06-12 20:14:08 +0000404 with self.open(support.TESTFN, "w+b", 0) as f:
405 self.large_file_ops(f)
406 with self.open(support.TESTFN, "w+b") as f:
407 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000408
409 def test_with_open(self):
410 for bufsize in (0, 1, 100):
411 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000412 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000413 f.write(b"xxx")
414 self.assertEqual(f.closed, True)
415 f = None
416 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000417 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000418 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000419 except ZeroDivisionError:
420 self.assertEqual(f.closed, True)
421 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000422 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000423
Antoine Pitroue741cc62009-01-21 00:45:36 +0000424 # issue 5008
425 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000426 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000427 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000428 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000429 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000430 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000431 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000432 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000433 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000434
Christian Heimes1a6387e2008-03-26 12:49:49 +0000435 def test_destructor(self):
436 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000437 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000438 def __del__(self):
439 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000440 try:
441 f = super(MyFileIO, self).__del__
442 except AttributeError:
443 pass
444 else:
445 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000446 def close(self):
447 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000448 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000449 def flush(self):
450 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000451 super(MyFileIO, self).flush()
452 f = MyFileIO(support.TESTFN, "wb")
453 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000454 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000455 support.gc_collect()
456 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000457 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000458 self.assertEqual(f.read(), b"xxx")
459
460 def _check_base_destructor(self, base):
461 record = []
462 class MyIO(base):
463 def __init__(self):
464 # This exercises the availability of attributes on object
465 # destruction.
466 # (in the C version, close() is called by the tp_dealloc
467 # function, not by __del__)
468 self.on_del = 1
469 self.on_close = 2
470 self.on_flush = 3
471 def __del__(self):
472 record.append(self.on_del)
473 try:
474 f = super(MyIO, self).__del__
475 except AttributeError:
476 pass
477 else:
478 f()
479 def close(self):
480 record.append(self.on_close)
481 super(MyIO, self).close()
482 def flush(self):
483 record.append(self.on_flush)
484 super(MyIO, self).flush()
485 f = MyIO()
486 del f
487 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000488 self.assertEqual(record, [1, 2, 3])
489
Antoine Pitrou19690592009-06-12 20:14:08 +0000490 def test_IOBase_destructor(self):
491 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000492
Antoine Pitrou19690592009-06-12 20:14:08 +0000493 def test_RawIOBase_destructor(self):
494 self._check_base_destructor(self.RawIOBase)
495
496 def test_BufferedIOBase_destructor(self):
497 self._check_base_destructor(self.BufferedIOBase)
498
499 def test_TextIOBase_destructor(self):
500 self._check_base_destructor(self.TextIOBase)
501
502 def test_close_flushes(self):
503 with self.open(support.TESTFN, "wb") as f:
504 f.write(b"xxx")
505 with self.open(support.TESTFN, "rb") as f:
506 self.assertEqual(f.read(), b"xxx")
507
508 def test_array_writes(self):
509 a = array.array(b'i', range(10))
510 n = len(a.tostring())
511 with self.open(support.TESTFN, "wb", 0) as f:
512 self.assertEqual(f.write(a), n)
513 with self.open(support.TESTFN, "wb") as f:
514 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000515
516 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000517 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000518 closefd=False)
519
Antoine Pitrou19690592009-06-12 20:14:08 +0000520 def test_read_closed(self):
521 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000522 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000523 with self.open(support.TESTFN, "r") as f:
524 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000525 self.assertEqual(file.read(), "egg\n")
526 file.seek(0)
527 file.close()
528 self.assertRaises(ValueError, file.read)
529
530 def test_no_closefd_with_filename(self):
531 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000532 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000533
534 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000535 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000536 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000537 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000538 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000539 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000540 self.assertEqual(file.buffer.raw.closefd, False)
541
Antoine Pitrou19690592009-06-12 20:14:08 +0000542 def test_garbage_collection(self):
543 # FileIO objects are collected, and collecting them flushes
544 # all data to disk.
545 f = self.FileIO(support.TESTFN, "wb")
546 f.write(b"abcxxx")
547 f.f = f
548 wr = weakref.ref(f)
549 del f
550 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000551 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000552 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000553 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000554
Antoine Pitrou19690592009-06-12 20:14:08 +0000555 def test_unbounded_file(self):
556 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
557 zero = "/dev/zero"
558 if not os.path.exists(zero):
559 self.skipTest("{0} does not exist".format(zero))
560 if sys.maxsize > 0x7FFFFFFF:
561 self.skipTest("test can only run in a 32-bit address space")
562 if support.real_max_memuse < support._2G:
563 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000564 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000565 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000566 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000567 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000568 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000569 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000570
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000571 def test_flush_error_on_close(self):
572 f = self.open(support.TESTFN, "wb", buffering=0)
573 def bad_flush():
574 raise IOError()
575 f.flush = bad_flush
576 self.assertRaises(IOError, f.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600577 self.assertTrue(f.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000578
579 def test_multi_close(self):
580 f = self.open(support.TESTFN, "wb", buffering=0)
581 f.close()
582 f.close()
583 f.close()
584 self.assertRaises(ValueError, f.flush)
585
Antoine Pitrou6391b342010-09-14 18:48:19 +0000586 def test_RawIOBase_read(self):
587 # Exercise the default RawIOBase.read() implementation (which calls
588 # readinto() internally).
589 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
590 self.assertEqual(rawio.read(2), b"ab")
591 self.assertEqual(rawio.read(2), b"c")
592 self.assertEqual(rawio.read(2), b"d")
593 self.assertEqual(rawio.read(2), None)
594 self.assertEqual(rawio.read(2), b"ef")
595 self.assertEqual(rawio.read(2), b"g")
596 self.assertEqual(rawio.read(2), None)
597 self.assertEqual(rawio.read(2), b"")
598
Hynek Schlawack877effc2012-05-25 09:24:18 +0200599 def test_fileio_closefd(self):
600 # Issue #4841
601 with self.open(__file__, 'rb') as f1, \
602 self.open(__file__, 'rb') as f2:
603 fileio = self.FileIO(f1.fileno(), closefd=False)
604 # .__init__() must not close f1
605 fileio.__init__(f2.fileno(), closefd=False)
606 f1.readline()
607 # .close() must not close f2
608 fileio.close()
609 f2.readline()
610
611
Antoine Pitrou19690592009-06-12 20:14:08 +0000612class CIOTest(IOTest):
Antoine Pitrou16166452011-07-12 22:04:20 +0200613
614 def test_IOBase_finalize(self):
615 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
616 # class which inherits IOBase and an object of this class are caught
617 # in a reference cycle and close() is already in the method cache.
618 class MyIO(self.IOBase):
619 def close(self):
620 pass
621
622 # create an instance to populate the method cache
623 MyIO()
624 obj = MyIO()
625 obj.obj = obj
626 wr = weakref.ref(obj)
627 del MyIO
628 del obj
629 support.gc_collect()
630 self.assertTrue(wr() is None, wr)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000631
Antoine Pitrou19690592009-06-12 20:14:08 +0000632class PyIOTest(IOTest):
633 test_array_writes = unittest.skip(
634 "len(array.array) returns number of elements rather than bytelength"
635 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000636
637
Antoine Pitrou19690592009-06-12 20:14:08 +0000638class CommonBufferedTests:
639 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
640
641 def test_detach(self):
642 raw = self.MockRawIO()
643 buf = self.tp(raw)
644 self.assertIs(buf.detach(), raw)
645 self.assertRaises(ValueError, buf.detach)
646
647 def test_fileno(self):
648 rawio = self.MockRawIO()
649 bufio = self.tp(rawio)
650
Ezio Melotti2623a372010-11-21 13:34:58 +0000651 self.assertEqual(42, bufio.fileno())
Antoine Pitrou19690592009-06-12 20:14:08 +0000652
653 def test_no_fileno(self):
654 # XXX will we always have fileno() function? If so, kill
655 # this test. Else, write it.
656 pass
657
658 def test_invalid_args(self):
659 rawio = self.MockRawIO()
660 bufio = self.tp(rawio)
661 # Invalid whence
662 self.assertRaises(ValueError, bufio.seek, 0, -1)
663 self.assertRaises(ValueError, bufio.seek, 0, 3)
664
665 def test_override_destructor(self):
666 tp = self.tp
667 record = []
668 class MyBufferedIO(tp):
669 def __del__(self):
670 record.append(1)
671 try:
672 f = super(MyBufferedIO, self).__del__
673 except AttributeError:
674 pass
675 else:
676 f()
677 def close(self):
678 record.append(2)
679 super(MyBufferedIO, self).close()
680 def flush(self):
681 record.append(3)
682 super(MyBufferedIO, self).flush()
683 rawio = self.MockRawIO()
684 bufio = MyBufferedIO(rawio)
685 writable = bufio.writable()
686 del bufio
687 support.gc_collect()
688 if writable:
689 self.assertEqual(record, [1, 2, 3])
690 else:
691 self.assertEqual(record, [1, 2])
692
693 def test_context_manager(self):
694 # Test usability as a context manager
695 rawio = self.MockRawIO()
696 bufio = self.tp(rawio)
697 def _with():
698 with bufio:
699 pass
700 _with()
701 # bufio should now be closed, and using it a second time should raise
702 # a ValueError.
703 self.assertRaises(ValueError, _with)
704
705 def test_error_through_destructor(self):
706 # Test that the exception state is not modified by a destructor,
707 # even if close() fails.
708 rawio = self.CloseFailureIO()
709 def f():
710 self.tp(rawio).xyzzy
711 with support.captured_output("stderr") as s:
712 self.assertRaises(AttributeError, f)
713 s = s.getvalue().strip()
714 if s:
715 # The destructor *may* have printed an unraisable error, check it
716 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000717 self.assertTrue(s.startswith("Exception IOError: "), s)
718 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000719
720 def test_repr(self):
721 raw = self.MockRawIO()
722 b = self.tp(raw)
723 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
724 self.assertEqual(repr(b), "<%s>" % clsname)
725 raw.name = "dummy"
726 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
727 raw.name = b"dummy"
728 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000729
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000730 def test_flush_error_on_close(self):
731 raw = self.MockRawIO()
732 def bad_flush():
733 raise IOError()
734 raw.flush = bad_flush
735 b = self.tp(raw)
736 self.assertRaises(IOError, b.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600737 self.assertTrue(b.closed)
738
739 def test_close_error_on_close(self):
740 raw = self.MockRawIO()
741 def bad_flush():
742 raise IOError('flush')
743 def bad_close():
744 raise IOError('close')
745 raw.close = bad_close
746 b = self.tp(raw)
747 b.flush = bad_flush
748 with self.assertRaises(IOError) as err: # exception not swallowed
749 b.close()
750 self.assertEqual(err.exception.args, ('close',))
751 self.assertFalse(b.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000752
753 def test_multi_close(self):
754 raw = self.MockRawIO()
755 b = self.tp(raw)
756 b.close()
757 b.close()
758 b.close()
759 self.assertRaises(ValueError, b.flush)
760
Antoine Pitroufc9ead62010-12-21 21:26:55 +0000761 def test_readonly_attributes(self):
762 raw = self.MockRawIO()
763 buf = self.tp(raw)
764 x = self.MockRawIO()
765 with self.assertRaises((AttributeError, TypeError)):
766 buf.raw = x
767
Christian Heimes1a6387e2008-03-26 12:49:49 +0000768
Antoine Pitroubff5df02012-07-29 19:02:46 +0200769class SizeofTest:
770
771 @support.cpython_only
772 def test_sizeof(self):
773 bufsize1 = 4096
774 bufsize2 = 8192
775 rawio = self.MockRawIO()
776 bufio = self.tp(rawio, buffer_size=bufsize1)
777 size = sys.getsizeof(bufio) - bufsize1
778 rawio = self.MockRawIO()
779 bufio = self.tp(rawio, buffer_size=bufsize2)
780 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
781
782
Antoine Pitrou19690592009-06-12 20:14:08 +0000783class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
784 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000785
Antoine Pitrou19690592009-06-12 20:14:08 +0000786 def test_constructor(self):
787 rawio = self.MockRawIO([b"abc"])
788 bufio = self.tp(rawio)
789 bufio.__init__(rawio)
790 bufio.__init__(rawio, buffer_size=1024)
791 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000792 self.assertEqual(b"abc", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000793 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
794 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
795 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
796 rawio = self.MockRawIO([b"abc"])
797 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000798 self.assertEqual(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000799
Antoine Pitrou19690592009-06-12 20:14:08 +0000800 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000801 for arg in (None, 7):
802 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
803 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000804 self.assertEqual(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000805 # Invalid args
806 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000807
Antoine Pitrou19690592009-06-12 20:14:08 +0000808 def test_read1(self):
809 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
810 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000811 self.assertEqual(b"a", bufio.read(1))
812 self.assertEqual(b"b", bufio.read1(1))
813 self.assertEqual(rawio._reads, 1)
814 self.assertEqual(b"c", bufio.read1(100))
815 self.assertEqual(rawio._reads, 1)
816 self.assertEqual(b"d", bufio.read1(100))
817 self.assertEqual(rawio._reads, 2)
818 self.assertEqual(b"efg", bufio.read1(100))
819 self.assertEqual(rawio._reads, 3)
820 self.assertEqual(b"", bufio.read1(100))
821 self.assertEqual(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000822 # Invalid args
823 self.assertRaises(ValueError, bufio.read1, -1)
824
825 def test_readinto(self):
826 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
827 bufio = self.tp(rawio)
828 b = bytearray(2)
Ezio Melotti2623a372010-11-21 13:34:58 +0000829 self.assertEqual(bufio.readinto(b), 2)
830 self.assertEqual(b, b"ab")
831 self.assertEqual(bufio.readinto(b), 2)
832 self.assertEqual(b, b"cd")
833 self.assertEqual(bufio.readinto(b), 2)
834 self.assertEqual(b, b"ef")
835 self.assertEqual(bufio.readinto(b), 1)
836 self.assertEqual(b, b"gf")
837 self.assertEqual(bufio.readinto(b), 0)
838 self.assertEqual(b, b"gf")
Antoine Pitrou19690592009-06-12 20:14:08 +0000839
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000840 def test_readlines(self):
841 def bufio():
842 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
843 return self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000844 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
845 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
846 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000847
Antoine Pitrou19690592009-06-12 20:14:08 +0000848 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000849 data = b"abcdefghi"
850 dlen = len(data)
851
852 tests = [
853 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
854 [ 100, [ 3, 3, 3], [ dlen ] ],
855 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
856 ]
857
858 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000859 rawio = self.MockFileIO(data)
860 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000861 pos = 0
862 for nbytes in buf_read_sizes:
Ezio Melotti2623a372010-11-21 13:34:58 +0000863 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Christian Heimes1a6387e2008-03-26 12:49:49 +0000864 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000865 # this is mildly implementation-dependent
Ezio Melotti2623a372010-11-21 13:34:58 +0000866 self.assertEqual(rawio.read_history, raw_read_sizes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000867
Antoine Pitrou19690592009-06-12 20:14:08 +0000868 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000869 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000870 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
871 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000872 self.assertEqual(b"abcd", bufio.read(6))
873 self.assertEqual(b"e", bufio.read(1))
874 self.assertEqual(b"fg", bufio.read())
875 self.assertEqual(b"", bufio.peek(1))
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200876 self.assertIsNone(bufio.read())
Ezio Melotti2623a372010-11-21 13:34:58 +0000877 self.assertEqual(b"", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000878
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200879 rawio = self.MockRawIO((b"a", None, None))
880 self.assertEqual(b"a", rawio.readall())
881 self.assertIsNone(rawio.readall())
882
Antoine Pitrou19690592009-06-12 20:14:08 +0000883 def test_read_past_eof(self):
884 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
885 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000886
Ezio Melotti2623a372010-11-21 13:34:58 +0000887 self.assertEqual(b"abcdefg", bufio.read(9000))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000888
Antoine Pitrou19690592009-06-12 20:14:08 +0000889 def test_read_all(self):
890 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
891 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000892
Ezio Melotti2623a372010-11-21 13:34:58 +0000893 self.assertEqual(b"abcdefg", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000894
Victor Stinner6a102812010-04-27 23:55:59 +0000895 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +0000896 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +0000897 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000898 try:
899 # Write out many bytes with exactly the same number of 0's,
900 # 1's... 255's. This will help us check that concurrent reading
901 # doesn't duplicate or forget contents.
902 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000903 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000904 random.shuffle(l)
905 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000906 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000907 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000908 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000909 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000910 errors = []
911 results = []
912 def f():
913 try:
914 # Intra-buffer read then buffer-flushing read
915 for n in cycle([1, 19]):
916 s = bufio.read(n)
917 if not s:
918 break
919 # list.append() is atomic
920 results.append(s)
921 except Exception as e:
922 errors.append(e)
923 raise
924 threads = [threading.Thread(target=f) for x in range(20)]
925 for t in threads:
926 t.start()
927 time.sleep(0.02) # yield
928 for t in threads:
929 t.join()
930 self.assertFalse(errors,
931 "the following exceptions were caught: %r" % errors)
932 s = b''.join(results)
933 for i in range(256):
934 c = bytes(bytearray([i]))
935 self.assertEqual(s.count(c), N)
936 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000937 support.unlink(support.TESTFN)
938
939 def test_misbehaved_io(self):
940 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
941 bufio = self.tp(rawio)
942 self.assertRaises(IOError, bufio.seek, 0)
943 self.assertRaises(IOError, bufio.tell)
944
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000945 def test_no_extraneous_read(self):
946 # Issue #9550; when the raw IO object has satisfied the read request,
947 # we should not issue any additional reads, otherwise it may block
948 # (e.g. socket).
949 bufsize = 16
950 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
951 rawio = self.MockRawIO([b"x" * n])
952 bufio = self.tp(rawio, bufsize)
953 self.assertEqual(bufio.read(n), b"x" * n)
954 # Simple case: one raw read is enough to satisfy the request.
955 self.assertEqual(rawio._extraneous_reads, 0,
956 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
957 # A more complex case where two raw reads are needed to satisfy
958 # the request.
959 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
960 bufio = self.tp(rawio, bufsize)
961 self.assertEqual(bufio.read(n), b"x" * n)
962 self.assertEqual(rawio._extraneous_reads, 0,
963 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
964
965
Antoine Pitroubff5df02012-07-29 19:02:46 +0200966class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +0000967 tp = io.BufferedReader
968
969 def test_constructor(self):
970 BufferedReaderTest.test_constructor(self)
971 # The allocation can succeed on 32-bit builds, e.g. with more
972 # than 2GB RAM and a 64-bit kernel.
973 if sys.maxsize > 0x7FFFFFFF:
974 rawio = self.MockRawIO()
975 bufio = self.tp(rawio)
976 self.assertRaises((OverflowError, MemoryError, ValueError),
977 bufio.__init__, rawio, sys.maxsize)
978
979 def test_initialization(self):
980 rawio = self.MockRawIO([b"abc"])
981 bufio = self.tp(rawio)
982 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
983 self.assertRaises(ValueError, bufio.read)
984 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
985 self.assertRaises(ValueError, bufio.read)
986 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
987 self.assertRaises(ValueError, bufio.read)
988
989 def test_misbehaved_io_read(self):
990 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
991 bufio = self.tp(rawio)
992 # _pyio.BufferedReader seems to implement reading different, so that
993 # checking this is not so easy.
994 self.assertRaises(IOError, bufio.read, 10)
995
996 def test_garbage_collection(self):
997 # C BufferedReader objects are collected.
998 # The Python version has __del__, so it ends into gc.garbage instead
999 rawio = self.FileIO(support.TESTFN, "w+b")
1000 f = self.tp(rawio)
1001 f.f = f
1002 wr = weakref.ref(f)
1003 del f
1004 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001005 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00001006
R David Murray5b2cf5e2013-02-23 22:11:21 -05001007 def test_args_error(self):
1008 # Issue #17275
1009 with self.assertRaisesRegexp(TypeError, "BufferedReader"):
1010 self.tp(io.BytesIO(), 1024, 1024, 1024)
1011
1012
Antoine Pitrou19690592009-06-12 20:14:08 +00001013class PyBufferedReaderTest(BufferedReaderTest):
1014 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001015
1016
Antoine Pitrou19690592009-06-12 20:14:08 +00001017class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1018 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001019
Antoine Pitrou19690592009-06-12 20:14:08 +00001020 def test_constructor(self):
1021 rawio = self.MockRawIO()
1022 bufio = self.tp(rawio)
1023 bufio.__init__(rawio)
1024 bufio.__init__(rawio, buffer_size=1024)
1025 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +00001026 self.assertEqual(3, bufio.write(b"abc"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001027 bufio.flush()
1028 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1029 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1030 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1031 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +00001032 self.assertEqual(3, bufio.write(b"ghi"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001033 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001034 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001035
Antoine Pitrou19690592009-06-12 20:14:08 +00001036 def test_detach_flush(self):
1037 raw = self.MockRawIO()
1038 buf = self.tp(raw)
1039 buf.write(b"howdy!")
1040 self.assertFalse(raw._write_stack)
1041 buf.detach()
1042 self.assertEqual(raw._write_stack, [b"howdy!"])
1043
1044 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001045 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +00001046 writer = self.MockRawIO()
1047 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001048 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001049 self.assertFalse(writer._write_stack)
1050
Antoine Pitrou19690592009-06-12 20:14:08 +00001051 def test_write_overflow(self):
1052 writer = self.MockRawIO()
1053 bufio = self.tp(writer, 8)
1054 contents = b"abcdefghijklmnop"
1055 for n in range(0, len(contents), 3):
1056 bufio.write(contents[n:n+3])
1057 flushed = b"".join(writer._write_stack)
1058 # At least (total - 8) bytes were implicitly flushed, perhaps more
1059 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001060 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001061
Antoine Pitrou19690592009-06-12 20:14:08 +00001062 def check_writes(self, intermediate_func):
1063 # Lots of writes, test the flushed output is as expected.
1064 contents = bytes(range(256)) * 1000
1065 n = 0
1066 writer = self.MockRawIO()
1067 bufio = self.tp(writer, 13)
1068 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1069 def gen_sizes():
1070 for size in count(1):
1071 for i in range(15):
1072 yield size
1073 sizes = gen_sizes()
1074 while n < len(contents):
1075 size = min(next(sizes), len(contents) - n)
Ezio Melotti2623a372010-11-21 13:34:58 +00001076 self.assertEqual(bufio.write(contents[n:n+size]), size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001077 intermediate_func(bufio)
1078 n += size
1079 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001080 self.assertEqual(contents,
Antoine Pitrou19690592009-06-12 20:14:08 +00001081 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001082
Antoine Pitrou19690592009-06-12 20:14:08 +00001083 def test_writes(self):
1084 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001085
Antoine Pitrou19690592009-06-12 20:14:08 +00001086 def test_writes_and_flushes(self):
1087 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001088
Antoine Pitrou19690592009-06-12 20:14:08 +00001089 def test_writes_and_seeks(self):
1090 def _seekabs(bufio):
1091 pos = bufio.tell()
1092 bufio.seek(pos + 1, 0)
1093 bufio.seek(pos - 1, 0)
1094 bufio.seek(pos, 0)
1095 self.check_writes(_seekabs)
1096 def _seekrel(bufio):
1097 pos = bufio.seek(0, 1)
1098 bufio.seek(+1, 1)
1099 bufio.seek(-1, 1)
1100 bufio.seek(pos, 0)
1101 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001102
Antoine Pitrou19690592009-06-12 20:14:08 +00001103 def test_writes_and_truncates(self):
1104 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001105
Antoine Pitrou19690592009-06-12 20:14:08 +00001106 def test_write_non_blocking(self):
1107 raw = self.MockNonBlockWriterIO()
1108 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001109
Ezio Melotti2623a372010-11-21 13:34:58 +00001110 self.assertEqual(bufio.write(b"abcd"), 4)
1111 self.assertEqual(bufio.write(b"efghi"), 5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001112 # 1 byte will be written, the rest will be buffered
1113 raw.block_on(b"k")
Ezio Melotti2623a372010-11-21 13:34:58 +00001114 self.assertEqual(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001115
Antoine Pitrou19690592009-06-12 20:14:08 +00001116 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1117 raw.block_on(b"0")
1118 try:
1119 bufio.write(b"opqrwxyz0123456789")
1120 except self.BlockingIOError as e:
1121 written = e.characters_written
1122 else:
1123 self.fail("BlockingIOError should have been raised")
Ezio Melotti2623a372010-11-21 13:34:58 +00001124 self.assertEqual(written, 16)
1125 self.assertEqual(raw.pop_written(),
Antoine Pitrou19690592009-06-12 20:14:08 +00001126 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001127
Ezio Melotti2623a372010-11-21 13:34:58 +00001128 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Antoine Pitrou19690592009-06-12 20:14:08 +00001129 s = raw.pop_written()
1130 # Previously buffered bytes were flushed
1131 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001132
Antoine Pitrou19690592009-06-12 20:14:08 +00001133 def test_write_and_rewind(self):
1134 raw = io.BytesIO()
1135 bufio = self.tp(raw, 4)
1136 self.assertEqual(bufio.write(b"abcdef"), 6)
1137 self.assertEqual(bufio.tell(), 6)
1138 bufio.seek(0, 0)
1139 self.assertEqual(bufio.write(b"XY"), 2)
1140 bufio.seek(6, 0)
1141 self.assertEqual(raw.getvalue(), b"XYcdef")
1142 self.assertEqual(bufio.write(b"123456"), 6)
1143 bufio.flush()
1144 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001145
Antoine Pitrou19690592009-06-12 20:14:08 +00001146 def test_flush(self):
1147 writer = self.MockRawIO()
1148 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001149 bufio.write(b"abc")
1150 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001151 self.assertEqual(b"abc", writer._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001152
Antoine Pitrou78e761e2012-10-16 22:57:11 +02001153 def test_writelines(self):
1154 l = [b'ab', b'cd', b'ef']
1155 writer = self.MockRawIO()
1156 bufio = self.tp(writer, 8)
1157 bufio.writelines(l)
1158 bufio.flush()
1159 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1160
1161 def test_writelines_userlist(self):
1162 l = UserList([b'ab', b'cd', b'ef'])
1163 writer = self.MockRawIO()
1164 bufio = self.tp(writer, 8)
1165 bufio.writelines(l)
1166 bufio.flush()
1167 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1168
1169 def test_writelines_error(self):
1170 writer = self.MockRawIO()
1171 bufio = self.tp(writer, 8)
1172 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1173 self.assertRaises(TypeError, bufio.writelines, None)
1174
Antoine Pitrou19690592009-06-12 20:14:08 +00001175 def test_destructor(self):
1176 writer = self.MockRawIO()
1177 bufio = self.tp(writer, 8)
1178 bufio.write(b"abc")
1179 del bufio
1180 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001181 self.assertEqual(b"abc", writer._write_stack[0])
Antoine Pitrou19690592009-06-12 20:14:08 +00001182
1183 def test_truncate(self):
1184 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001185 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001186 bufio = self.tp(raw, 8)
1187 bufio.write(b"abcdef")
1188 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001189 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001190 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001191 self.assertEqual(f.read(), b"abc")
1192
Victor Stinner6a102812010-04-27 23:55:59 +00001193 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001194 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001195 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001196 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001197 # Write out many bytes from many threads and test they were
1198 # all flushed.
1199 N = 1000
1200 contents = bytes(range(256)) * N
1201 sizes = cycle([1, 19])
1202 n = 0
1203 queue = deque()
1204 while n < len(contents):
1205 size = next(sizes)
1206 queue.append(contents[n:n+size])
1207 n += size
1208 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001209 # We use a real file object because it allows us to
1210 # exercise situations where the GIL is released before
1211 # writing the buffer to the raw streams. This is in addition
1212 # to concurrency issues due to switching threads in the middle
1213 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001214 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001215 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001216 errors = []
1217 def f():
1218 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001219 while True:
1220 try:
1221 s = queue.popleft()
1222 except IndexError:
1223 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001224 bufio.write(s)
1225 except Exception as e:
1226 errors.append(e)
1227 raise
1228 threads = [threading.Thread(target=f) for x in range(20)]
1229 for t in threads:
1230 t.start()
1231 time.sleep(0.02) # yield
1232 for t in threads:
1233 t.join()
1234 self.assertFalse(errors,
1235 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001236 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001237 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001238 s = f.read()
1239 for i in range(256):
Ezio Melotti2623a372010-11-21 13:34:58 +00001240 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001241 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001242 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001243
Antoine Pitrou19690592009-06-12 20:14:08 +00001244 def test_misbehaved_io(self):
1245 rawio = self.MisbehavedRawIO()
1246 bufio = self.tp(rawio, 5)
1247 self.assertRaises(IOError, bufio.seek, 0)
1248 self.assertRaises(IOError, bufio.tell)
1249 self.assertRaises(IOError, bufio.write, b"abcdef")
1250
1251 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001252 with support.check_warnings(("max_buffer_size is deprecated",
1253 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001254 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001255
Benjamin Petersona2d6d712012-12-20 12:24:10 -06001256 def test_write_error_on_close(self):
1257 raw = self.MockRawIO()
1258 def bad_write(b):
1259 raise IOError()
1260 raw.write = bad_write
1261 b = self.tp(raw)
1262 b.write(b'spam')
1263 self.assertRaises(IOError, b.close) # exception not swallowed
1264 self.assertTrue(b.closed)
1265
Antoine Pitrou19690592009-06-12 20:14:08 +00001266
Antoine Pitroubff5df02012-07-29 19:02:46 +02001267class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001268 tp = io.BufferedWriter
1269
1270 def test_constructor(self):
1271 BufferedWriterTest.test_constructor(self)
1272 # The allocation can succeed on 32-bit builds, e.g. with more
1273 # than 2GB RAM and a 64-bit kernel.
1274 if sys.maxsize > 0x7FFFFFFF:
1275 rawio = self.MockRawIO()
1276 bufio = self.tp(rawio)
1277 self.assertRaises((OverflowError, MemoryError, ValueError),
1278 bufio.__init__, rawio, sys.maxsize)
1279
1280 def test_initialization(self):
1281 rawio = self.MockRawIO()
1282 bufio = self.tp(rawio)
1283 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1284 self.assertRaises(ValueError, bufio.write, b"def")
1285 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1286 self.assertRaises(ValueError, bufio.write, b"def")
1287 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1288 self.assertRaises(ValueError, bufio.write, b"def")
1289
1290 def test_garbage_collection(self):
1291 # C BufferedWriter objects are collected, and collecting them flushes
1292 # all data to disk.
1293 # The Python version has __del__, so it ends into gc.garbage instead
1294 rawio = self.FileIO(support.TESTFN, "w+b")
1295 f = self.tp(rawio)
1296 f.write(b"123xxx")
1297 f.x = f
1298 wr = weakref.ref(f)
1299 del f
1300 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001301 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001302 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001303 self.assertEqual(f.read(), b"123xxx")
1304
R David Murray5b2cf5e2013-02-23 22:11:21 -05001305 def test_args_error(self):
1306 # Issue #17275
1307 with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
1308 self.tp(io.BytesIO(), 1024, 1024, 1024)
1309
Antoine Pitrou19690592009-06-12 20:14:08 +00001310
1311class PyBufferedWriterTest(BufferedWriterTest):
1312 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001313
1314class BufferedRWPairTest(unittest.TestCase):
1315
Antoine Pitrou19690592009-06-12 20:14:08 +00001316 def test_constructor(self):
1317 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001318 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001319
Antoine Pitrou19690592009-06-12 20:14:08 +00001320 def test_detach(self):
1321 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1322 self.assertRaises(self.UnsupportedOperation, pair.detach)
1323
1324 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001325 with support.check_warnings(("max_buffer_size is deprecated",
1326 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001327 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001328
1329 def test_constructor_with_not_readable(self):
1330 class NotReadable(MockRawIO):
1331 def readable(self):
1332 return False
1333
1334 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1335
1336 def test_constructor_with_not_writeable(self):
1337 class NotWriteable(MockRawIO):
1338 def writable(self):
1339 return False
1340
1341 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1342
1343 def test_read(self):
1344 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1345
1346 self.assertEqual(pair.read(3), b"abc")
1347 self.assertEqual(pair.read(1), b"d")
1348 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001349 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1350 self.assertEqual(pair.read(None), b"abc")
1351
1352 def test_readlines(self):
1353 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1354 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1355 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1356 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001357
1358 def test_read1(self):
1359 # .read1() is delegated to the underlying reader object, so this test
1360 # can be shallow.
1361 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1362
1363 self.assertEqual(pair.read1(3), b"abc")
1364
1365 def test_readinto(self):
1366 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1367
1368 data = bytearray(5)
1369 self.assertEqual(pair.readinto(data), 5)
1370 self.assertEqual(data, b"abcde")
1371
1372 def test_write(self):
1373 w = self.MockRawIO()
1374 pair = self.tp(self.MockRawIO(), w)
1375
1376 pair.write(b"abc")
1377 pair.flush()
1378 pair.write(b"def")
1379 pair.flush()
1380 self.assertEqual(w._write_stack, [b"abc", b"def"])
1381
1382 def test_peek(self):
1383 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1384
1385 self.assertTrue(pair.peek(3).startswith(b"abc"))
1386 self.assertEqual(pair.read(3), b"abc")
1387
1388 def test_readable(self):
1389 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1390 self.assertTrue(pair.readable())
1391
1392 def test_writeable(self):
1393 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1394 self.assertTrue(pair.writable())
1395
1396 def test_seekable(self):
1397 # BufferedRWPairs are never seekable, even if their readers and writers
1398 # are.
1399 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1400 self.assertFalse(pair.seekable())
1401
1402 # .flush() is delegated to the underlying writer object and has been
1403 # tested in the test_write method.
1404
1405 def test_close_and_closed(self):
1406 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1407 self.assertFalse(pair.closed)
1408 pair.close()
1409 self.assertTrue(pair.closed)
1410
1411 def test_isatty(self):
1412 class SelectableIsAtty(MockRawIO):
1413 def __init__(self, isatty):
1414 MockRawIO.__init__(self)
1415 self._isatty = isatty
1416
1417 def isatty(self):
1418 return self._isatty
1419
1420 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1421 self.assertFalse(pair.isatty())
1422
1423 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1424 self.assertTrue(pair.isatty())
1425
1426 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1427 self.assertTrue(pair.isatty())
1428
1429 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1430 self.assertTrue(pair.isatty())
1431
1432class CBufferedRWPairTest(BufferedRWPairTest):
1433 tp = io.BufferedRWPair
1434
1435class PyBufferedRWPairTest(BufferedRWPairTest):
1436 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001437
1438
Antoine Pitrou19690592009-06-12 20:14:08 +00001439class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1440 read_mode = "rb+"
1441 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001442
Antoine Pitrou19690592009-06-12 20:14:08 +00001443 def test_constructor(self):
1444 BufferedReaderTest.test_constructor(self)
1445 BufferedWriterTest.test_constructor(self)
1446
1447 def test_read_and_write(self):
1448 raw = self.MockRawIO((b"asdf", b"ghjk"))
1449 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001450
1451 self.assertEqual(b"as", rw.read(2))
1452 rw.write(b"ddd")
1453 rw.write(b"eee")
1454 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001455 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001456 self.assertEqual(b"dddeee", raw._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001457
Antoine Pitrou19690592009-06-12 20:14:08 +00001458 def test_seek_and_tell(self):
1459 raw = self.BytesIO(b"asdfghjkl")
1460 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001461
Ezio Melotti2623a372010-11-21 13:34:58 +00001462 self.assertEqual(b"as", rw.read(2))
1463 self.assertEqual(2, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001464 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001465 self.assertEqual(b"asdf", rw.read(4))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001466
Antoine Pitrou808cec52011-08-20 15:40:58 +02001467 rw.write(b"123f")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001468 rw.seek(0, 0)
Antoine Pitrou808cec52011-08-20 15:40:58 +02001469 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001470 self.assertEqual(9, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001471 rw.seek(-4, 2)
Ezio Melotti2623a372010-11-21 13:34:58 +00001472 self.assertEqual(5, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001473 rw.seek(2, 1)
Ezio Melotti2623a372010-11-21 13:34:58 +00001474 self.assertEqual(7, rw.tell())
1475 self.assertEqual(b"fl", rw.read(11))
Antoine Pitrou808cec52011-08-20 15:40:58 +02001476 rw.flush()
1477 self.assertEqual(b"asdf123fl", raw.getvalue())
1478
Christian Heimes1a6387e2008-03-26 12:49:49 +00001479 self.assertRaises(TypeError, rw.seek, 0.0)
1480
Antoine Pitrou19690592009-06-12 20:14:08 +00001481 def check_flush_and_read(self, read_func):
1482 raw = self.BytesIO(b"abcdefghi")
1483 bufio = self.tp(raw)
1484
Ezio Melotti2623a372010-11-21 13:34:58 +00001485 self.assertEqual(b"ab", read_func(bufio, 2))
Antoine Pitrou19690592009-06-12 20:14:08 +00001486 bufio.write(b"12")
Ezio Melotti2623a372010-11-21 13:34:58 +00001487 self.assertEqual(b"ef", read_func(bufio, 2))
1488 self.assertEqual(6, bufio.tell())
Antoine Pitrou19690592009-06-12 20:14:08 +00001489 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001490 self.assertEqual(6, bufio.tell())
1491 self.assertEqual(b"ghi", read_func(bufio))
Antoine Pitrou19690592009-06-12 20:14:08 +00001492 raw.seek(0, 0)
1493 raw.write(b"XYZ")
1494 # flush() resets the read buffer
1495 bufio.flush()
1496 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001497 self.assertEqual(b"XYZ", read_func(bufio, 3))
Antoine Pitrou19690592009-06-12 20:14:08 +00001498
1499 def test_flush_and_read(self):
1500 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1501
1502 def test_flush_and_readinto(self):
1503 def _readinto(bufio, n=-1):
1504 b = bytearray(n if n >= 0 else 9999)
1505 n = bufio.readinto(b)
1506 return bytes(b[:n])
1507 self.check_flush_and_read(_readinto)
1508
1509 def test_flush_and_peek(self):
1510 def _peek(bufio, n=-1):
1511 # This relies on the fact that the buffer can contain the whole
1512 # raw stream, otherwise peek() can return less.
1513 b = bufio.peek(n)
1514 if n != -1:
1515 b = b[:n]
1516 bufio.seek(len(b), 1)
1517 return b
1518 self.check_flush_and_read(_peek)
1519
1520 def test_flush_and_write(self):
1521 raw = self.BytesIO(b"abcdefghi")
1522 bufio = self.tp(raw)
1523
1524 bufio.write(b"123")
1525 bufio.flush()
1526 bufio.write(b"45")
1527 bufio.flush()
1528 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001529 self.assertEqual(b"12345fghi", raw.getvalue())
1530 self.assertEqual(b"12345fghi", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +00001531
1532 def test_threads(self):
1533 BufferedReaderTest.test_threads(self)
1534 BufferedWriterTest.test_threads(self)
1535
1536 def test_writes_and_peek(self):
1537 def _peek(bufio):
1538 bufio.peek(1)
1539 self.check_writes(_peek)
1540 def _peek(bufio):
1541 pos = bufio.tell()
1542 bufio.seek(-1, 1)
1543 bufio.peek(1)
1544 bufio.seek(pos, 0)
1545 self.check_writes(_peek)
1546
1547 def test_writes_and_reads(self):
1548 def _read(bufio):
1549 bufio.seek(-1, 1)
1550 bufio.read(1)
1551 self.check_writes(_read)
1552
1553 def test_writes_and_read1s(self):
1554 def _read1(bufio):
1555 bufio.seek(-1, 1)
1556 bufio.read1(1)
1557 self.check_writes(_read1)
1558
1559 def test_writes_and_readintos(self):
1560 def _read(bufio):
1561 bufio.seek(-1, 1)
1562 bufio.readinto(bytearray(1))
1563 self.check_writes(_read)
1564
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001565 def test_write_after_readahead(self):
1566 # Issue #6629: writing after the buffer was filled by readahead should
1567 # first rewind the raw stream.
1568 for overwrite_size in [1, 5]:
1569 raw = self.BytesIO(b"A" * 10)
1570 bufio = self.tp(raw, 4)
1571 # Trigger readahead
1572 self.assertEqual(bufio.read(1), b"A")
1573 self.assertEqual(bufio.tell(), 1)
1574 # Overwriting should rewind the raw stream if it needs so
1575 bufio.write(b"B" * overwrite_size)
1576 self.assertEqual(bufio.tell(), overwrite_size + 1)
1577 # If the write size was smaller than the buffer size, flush() and
1578 # check that rewind happens.
1579 bufio.flush()
1580 self.assertEqual(bufio.tell(), overwrite_size + 1)
1581 s = raw.getvalue()
1582 self.assertEqual(s,
1583 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1584
Antoine Pitrouee46a7b2011-05-13 00:31:52 +02001585 def test_write_rewind_write(self):
1586 # Various combinations of reading / writing / seeking backwards / writing again
1587 def mutate(bufio, pos1, pos2):
1588 assert pos2 >= pos1
1589 # Fill the buffer
1590 bufio.seek(pos1)
1591 bufio.read(pos2 - pos1)
1592 bufio.write(b'\x02')
1593 # This writes earlier than the previous write, but still inside
1594 # the buffer.
1595 bufio.seek(pos1)
1596 bufio.write(b'\x01')
1597
1598 b = b"\x80\x81\x82\x83\x84"
1599 for i in range(0, len(b)):
1600 for j in range(i, len(b)):
1601 raw = self.BytesIO(b)
1602 bufio = self.tp(raw, 100)
1603 mutate(bufio, i, j)
1604 bufio.flush()
1605 expected = bytearray(b)
1606 expected[j] = 2
1607 expected[i] = 1
1608 self.assertEqual(raw.getvalue(), expected,
1609 "failed result for i=%d, j=%d" % (i, j))
1610
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001611 def test_truncate_after_read_or_write(self):
1612 raw = self.BytesIO(b"A" * 10)
1613 bufio = self.tp(raw, 100)
1614 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1615 self.assertEqual(bufio.truncate(), 2)
1616 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1617 self.assertEqual(bufio.truncate(), 4)
1618
Antoine Pitrou19690592009-06-12 20:14:08 +00001619 def test_misbehaved_io(self):
1620 BufferedReaderTest.test_misbehaved_io(self)
1621 BufferedWriterTest.test_misbehaved_io(self)
1622
Antoine Pitrou808cec52011-08-20 15:40:58 +02001623 def test_interleaved_read_write(self):
1624 # Test for issue #12213
1625 with self.BytesIO(b'abcdefgh') as raw:
1626 with self.tp(raw, 100) as f:
1627 f.write(b"1")
1628 self.assertEqual(f.read(1), b'b')
1629 f.write(b'2')
1630 self.assertEqual(f.read1(1), b'd')
1631 f.write(b'3')
1632 buf = bytearray(1)
1633 f.readinto(buf)
1634 self.assertEqual(buf, b'f')
1635 f.write(b'4')
1636 self.assertEqual(f.peek(1), b'h')
1637 f.flush()
1638 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1639
1640 with self.BytesIO(b'abc') as raw:
1641 with self.tp(raw, 100) as f:
1642 self.assertEqual(f.read(1), b'a')
1643 f.write(b"2")
1644 self.assertEqual(f.read(1), b'c')
1645 f.flush()
1646 self.assertEqual(raw.getvalue(), b'a2c')
1647
1648 def test_interleaved_readline_write(self):
1649 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1650 with self.tp(raw) as f:
1651 f.write(b'1')
1652 self.assertEqual(f.readline(), b'b\n')
1653 f.write(b'2')
1654 self.assertEqual(f.readline(), b'def\n')
1655 f.write(b'3')
1656 self.assertEqual(f.readline(), b'\n')
1657 f.flush()
1658 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1659
R David Murray5b2cf5e2013-02-23 22:11:21 -05001660
Antoine Pitroubff5df02012-07-29 19:02:46 +02001661class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
1662 BufferedRandomTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001663 tp = io.BufferedRandom
1664
1665 def test_constructor(self):
1666 BufferedRandomTest.test_constructor(self)
1667 # The allocation can succeed on 32-bit builds, e.g. with more
1668 # than 2GB RAM and a 64-bit kernel.
1669 if sys.maxsize > 0x7FFFFFFF:
1670 rawio = self.MockRawIO()
1671 bufio = self.tp(rawio)
1672 self.assertRaises((OverflowError, MemoryError, ValueError),
1673 bufio.__init__, rawio, sys.maxsize)
1674
1675 def test_garbage_collection(self):
1676 CBufferedReaderTest.test_garbage_collection(self)
1677 CBufferedWriterTest.test_garbage_collection(self)
1678
R David Murray5b2cf5e2013-02-23 22:11:21 -05001679 def test_args_error(self):
1680 # Issue #17275
1681 with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
1682 self.tp(io.BytesIO(), 1024, 1024, 1024)
1683
1684
Antoine Pitrou19690592009-06-12 20:14:08 +00001685class PyBufferedRandomTest(BufferedRandomTest):
1686 tp = pyio.BufferedRandom
1687
1688
Christian Heimes1a6387e2008-03-26 12:49:49 +00001689# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1690# properties:
1691# - A single output character can correspond to many bytes of input.
1692# - The number of input bytes to complete the character can be
1693# undetermined until the last input byte is received.
1694# - The number of input bytes can vary depending on previous input.
1695# - A single input byte can correspond to many characters of output.
1696# - The number of output characters can be undetermined until the
1697# last input byte is received.
1698# - The number of output characters can vary depending on previous input.
1699
1700class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1701 """
1702 For testing seek/tell behavior with a stateful, buffering decoder.
1703
1704 Input is a sequence of words. Words may be fixed-length (length set
1705 by input) or variable-length (period-terminated). In variable-length
1706 mode, extra periods are ignored. Possible words are:
1707 - 'i' followed by a number sets the input length, I (maximum 99).
1708 When I is set to 0, words are space-terminated.
1709 - 'o' followed by a number sets the output length, O (maximum 99).
1710 - Any other word is converted into a word followed by a period on
1711 the output. The output word consists of the input word truncated
1712 or padded out with hyphens to make its length equal to O. If O
1713 is 0, the word is output verbatim without truncating or padding.
1714 I and O are initially set to 1. When I changes, any buffered input is
1715 re-scanned according to the new I. EOF also terminates the last word.
1716 """
1717
1718 def __init__(self, errors='strict'):
1719 codecs.IncrementalDecoder.__init__(self, errors)
1720 self.reset()
1721
1722 def __repr__(self):
1723 return '<SID %x>' % id(self)
1724
1725 def reset(self):
1726 self.i = 1
1727 self.o = 1
1728 self.buffer = bytearray()
1729
1730 def getstate(self):
1731 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1732 return bytes(self.buffer), i*100 + o
1733
1734 def setstate(self, state):
1735 buffer, io = state
1736 self.buffer = bytearray(buffer)
1737 i, o = divmod(io, 100)
1738 self.i, self.o = i ^ 1, o ^ 1
1739
1740 def decode(self, input, final=False):
1741 output = ''
1742 for b in input:
1743 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001744 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001745 if self.buffer:
1746 output += self.process_word()
1747 else:
1748 self.buffer.append(b)
1749 else: # fixed-length, terminate after self.i bytes
1750 self.buffer.append(b)
1751 if len(self.buffer) == self.i:
1752 output += self.process_word()
1753 if final and self.buffer: # EOF terminates the last word
1754 output += self.process_word()
1755 return output
1756
1757 def process_word(self):
1758 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001759 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001760 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001761 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001762 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1763 else:
1764 output = self.buffer.decode('ascii')
1765 if len(output) < self.o:
1766 output += '-'*self.o # pad out with hyphens
1767 if self.o:
1768 output = output[:self.o] # truncate to output length
1769 output += '.'
1770 self.buffer = bytearray()
1771 return output
1772
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001773 codecEnabled = False
1774
1775 @classmethod
1776 def lookupTestDecoder(cls, name):
1777 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001778 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001779 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001780 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001781 incrementalencoder=None,
1782 streamreader=None, streamwriter=None,
1783 incrementaldecoder=cls)
1784
1785# Register the previous decoder for testing.
1786# Disabled by default, tests will enable it.
1787codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1788
1789
Christian Heimes1a6387e2008-03-26 12:49:49 +00001790class StatefulIncrementalDecoderTest(unittest.TestCase):
1791 """
1792 Make sure the StatefulIncrementalDecoder actually works.
1793 """
1794
1795 test_cases = [
1796 # I=1, O=1 (fixed-length input == fixed-length output)
1797 (b'abcd', False, 'a.b.c.d.'),
1798 # I=0, O=0 (variable-length input, variable-length output)
1799 (b'oiabcd', True, 'abcd.'),
1800 # I=0, O=0 (should ignore extra periods)
1801 (b'oi...abcd...', True, 'abcd.'),
1802 # I=0, O=6 (variable-length input, fixed-length output)
1803 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1804 # I=2, O=6 (fixed-length input < fixed-length output)
1805 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1806 # I=6, O=3 (fixed-length input > fixed-length output)
1807 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1808 # I=0, then 3; O=29, then 15 (with longer output)
1809 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1810 'a----------------------------.' +
1811 'b----------------------------.' +
1812 'cde--------------------------.' +
1813 'abcdefghijabcde.' +
1814 'a.b------------.' +
1815 '.c.------------.' +
1816 'd.e------------.' +
1817 'k--------------.' +
1818 'l--------------.' +
1819 'm--------------.')
1820 ]
1821
Antoine Pitrou19690592009-06-12 20:14:08 +00001822 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001823 # Try a few one-shot test cases.
1824 for input, eof, output in self.test_cases:
1825 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001826 self.assertEqual(d.decode(input, eof), output)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001827
1828 # Also test an unfinished decode, followed by forcing EOF.
1829 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001830 self.assertEqual(d.decode(b'oiabcd'), '')
1831 self.assertEqual(d.decode(b'', 1), 'abcd.')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001832
1833class TextIOWrapperTest(unittest.TestCase):
1834
1835 def setUp(self):
1836 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1837 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001838 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001839
1840 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001841 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001842
Antoine Pitrou19690592009-06-12 20:14:08 +00001843 def test_constructor(self):
1844 r = self.BytesIO(b"\xc3\xa9\n\n")
1845 b = self.BufferedReader(r, 1000)
1846 t = self.TextIOWrapper(b)
1847 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti2623a372010-11-21 13:34:58 +00001848 self.assertEqual(t.encoding, "latin1")
1849 self.assertEqual(t.line_buffering, False)
Antoine Pitrou19690592009-06-12 20:14:08 +00001850 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti2623a372010-11-21 13:34:58 +00001851 self.assertEqual(t.encoding, "utf8")
1852 self.assertEqual(t.line_buffering, True)
1853 self.assertEqual("\xe9\n", t.readline())
Antoine Pitrou19690592009-06-12 20:14:08 +00001854 self.assertRaises(TypeError, t.__init__, b, newline=42)
1855 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1856
1857 def test_detach(self):
1858 r = self.BytesIO()
1859 b = self.BufferedWriter(r)
1860 t = self.TextIOWrapper(b)
1861 self.assertIs(t.detach(), b)
1862
1863 t = self.TextIOWrapper(b, encoding="ascii")
1864 t.write("howdy")
1865 self.assertFalse(r.getvalue())
1866 t.detach()
1867 self.assertEqual(r.getvalue(), b"howdy")
1868 self.assertRaises(ValueError, t.detach)
1869
1870 def test_repr(self):
1871 raw = self.BytesIO("hello".encode("utf-8"))
1872 b = self.BufferedReader(raw)
1873 t = self.TextIOWrapper(b, encoding="utf-8")
1874 modname = self.TextIOWrapper.__module__
1875 self.assertEqual(repr(t),
1876 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1877 raw.name = "dummy"
1878 self.assertEqual(repr(t),
1879 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1880 raw.name = b"dummy"
1881 self.assertEqual(repr(t),
1882 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1883
1884 def test_line_buffering(self):
1885 r = self.BytesIO()
1886 b = self.BufferedWriter(r, 1000)
1887 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1888 t.write("X")
Ezio Melotti2623a372010-11-21 13:34:58 +00001889 self.assertEqual(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001890 t.write("Y\nZ")
Ezio Melotti2623a372010-11-21 13:34:58 +00001891 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001892 t.write("A\rB")
Ezio Melotti2623a372010-11-21 13:34:58 +00001893 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001894
Antoine Pitrou19690592009-06-12 20:14:08 +00001895 def test_encoding(self):
1896 # Check the encoding attribute is always set, and valid
1897 b = self.BytesIO()
1898 t = self.TextIOWrapper(b, encoding="utf8")
1899 self.assertEqual(t.encoding, "utf8")
1900 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001901 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00001902 codecs.lookup(t.encoding)
1903
1904 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001905 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001906 b = self.BytesIO(b"abc\n\xff\n")
1907 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001908 self.assertRaises(UnicodeError, t.read)
1909 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001910 b = self.BytesIO(b"abc\n\xff\n")
1911 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001912 self.assertRaises(UnicodeError, t.read)
1913 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001914 b = self.BytesIO(b"abc\n\xff\n")
1915 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti2623a372010-11-21 13:34:58 +00001916 self.assertEqual(t.read(), "abc\n\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001917 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001918 b = self.BytesIO(b"abc\n\xff\n")
1919 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti2623a372010-11-21 13:34:58 +00001920 self.assertEqual(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001921
Antoine Pitrou19690592009-06-12 20:14:08 +00001922 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001923 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001924 b = self.BytesIO()
1925 t = self.TextIOWrapper(b, encoding="ascii")
1926 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001927 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001928 b = self.BytesIO()
1929 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1930 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001931 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001932 b = self.BytesIO()
1933 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001934 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001935 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001936 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001937 self.assertEqual(b.getvalue(), b"abcdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001938 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001939 b = self.BytesIO()
1940 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001941 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001942 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001943 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001944 self.assertEqual(b.getvalue(), b"abc?def\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001945
Antoine Pitrou19690592009-06-12 20:14:08 +00001946 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001947 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1948
1949 tests = [
1950 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1951 [ '', input_lines ],
1952 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1953 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1954 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1955 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001956 encodings = (
1957 'utf-8', 'latin-1',
1958 'utf-16', 'utf-16-le', 'utf-16-be',
1959 'utf-32', 'utf-32-le', 'utf-32-be',
1960 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00001961
1962 # Try a range of buffer sizes to test the case where \r is the last
1963 # character in TextIOWrapper._pending_line.
1964 for encoding in encodings:
1965 # XXX: str.encode() should return bytes
1966 data = bytes(''.join(input_lines).encode(encoding))
1967 for do_reads in (False, True):
1968 for bufsize in range(1, 10):
1969 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001970 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1971 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00001972 encoding=encoding)
1973 if do_reads:
1974 got_lines = []
1975 while True:
1976 c2 = textio.read(2)
1977 if c2 == '':
1978 break
Ezio Melotti2623a372010-11-21 13:34:58 +00001979 self.assertEqual(len(c2), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001980 got_lines.append(c2 + textio.readline())
1981 else:
1982 got_lines = list(textio)
1983
1984 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti2623a372010-11-21 13:34:58 +00001985 self.assertEqual(got_line, exp_line)
1986 self.assertEqual(len(got_lines), len(exp_lines))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001987
Antoine Pitrou19690592009-06-12 20:14:08 +00001988 def test_newlines_input(self):
1989 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001990 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1991 for newline, expected in [
1992 (None, normalized.decode("ascii").splitlines(True)),
1993 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00001994 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1995 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1996 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00001997 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00001998 buf = self.BytesIO(testdata)
1999 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti2623a372010-11-21 13:34:58 +00002000 self.assertEqual(txt.readlines(), expected)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002001 txt.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002002 self.assertEqual(txt.read(), "".join(expected))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002003
Antoine Pitrou19690592009-06-12 20:14:08 +00002004 def test_newlines_output(self):
2005 testdict = {
2006 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2007 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2008 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2009 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2010 }
2011 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2012 for newline, expected in tests:
2013 buf = self.BytesIO()
2014 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2015 txt.write("AAA\nB")
2016 txt.write("BB\nCCC\n")
2017 txt.write("X\rY\r\nZ")
2018 txt.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002019 self.assertEqual(buf.closed, False)
2020 self.assertEqual(buf.getvalue(), expected)
Antoine Pitrou19690592009-06-12 20:14:08 +00002021
2022 def test_destructor(self):
2023 l = []
2024 base = self.BytesIO
2025 class MyBytesIO(base):
2026 def close(self):
2027 l.append(self.getvalue())
2028 base.close(self)
2029 b = MyBytesIO()
2030 t = self.TextIOWrapper(b, encoding="ascii")
2031 t.write("abc")
2032 del t
2033 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00002034 self.assertEqual([b"abc"], l)
Antoine Pitrou19690592009-06-12 20:14:08 +00002035
2036 def test_override_destructor(self):
2037 record = []
2038 class MyTextIO(self.TextIOWrapper):
2039 def __del__(self):
2040 record.append(1)
2041 try:
2042 f = super(MyTextIO, self).__del__
2043 except AttributeError:
2044 pass
2045 else:
2046 f()
2047 def close(self):
2048 record.append(2)
2049 super(MyTextIO, self).close()
2050 def flush(self):
2051 record.append(3)
2052 super(MyTextIO, self).flush()
2053 b = self.BytesIO()
2054 t = MyTextIO(b, encoding="ascii")
2055 del t
2056 support.gc_collect()
2057 self.assertEqual(record, [1, 2, 3])
2058
2059 def test_error_through_destructor(self):
2060 # Test that the exception state is not modified by a destructor,
2061 # even if close() fails.
2062 rawio = self.CloseFailureIO()
2063 def f():
2064 self.TextIOWrapper(rawio).xyzzy
2065 with support.captured_output("stderr") as s:
2066 self.assertRaises(AttributeError, f)
2067 s = s.getvalue().strip()
2068 if s:
2069 # The destructor *may* have printed an unraisable error, check it
2070 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002071 self.assertTrue(s.startswith("Exception IOError: "), s)
2072 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002073
2074 # Systematic tests of the text I/O API
2075
Antoine Pitrou19690592009-06-12 20:14:08 +00002076 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002077 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2078 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00002079 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002080 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002081 self.assertEqual(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002082 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002083 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002084 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002085 self.assertEqual(f.tell(), 0)
2086 self.assertEqual(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002087 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002088 self.assertEqual(f.seek(0), 0)
2089 self.assertEqual(f.read(None), "abc")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002090 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002091 self.assertEqual(f.read(2), "ab")
2092 self.assertEqual(f.read(1), "c")
2093 self.assertEqual(f.read(1), "")
2094 self.assertEqual(f.read(), "")
2095 self.assertEqual(f.tell(), cookie)
2096 self.assertEqual(f.seek(0), 0)
2097 self.assertEqual(f.seek(0, 2), cookie)
2098 self.assertEqual(f.write("def"), 3)
2099 self.assertEqual(f.seek(cookie), cookie)
2100 self.assertEqual(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002101 if enc.startswith("utf"):
2102 self.multi_line_test(f, enc)
2103 f.close()
2104
2105 def multi_line_test(self, f, enc):
2106 f.seek(0)
2107 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00002108 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002109 wlines = []
2110 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2111 chars = []
2112 for i in range(size):
2113 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00002114 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002115 wlines.append((f.tell(), line))
2116 f.write(line)
2117 f.seek(0)
2118 rlines = []
2119 while True:
2120 pos = f.tell()
2121 line = f.readline()
2122 if not line:
2123 break
2124 rlines.append((pos, line))
Ezio Melotti2623a372010-11-21 13:34:58 +00002125 self.assertEqual(rlines, wlines)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002126
Antoine Pitrou19690592009-06-12 20:14:08 +00002127 def test_telling(self):
2128 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002129 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002130 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002131 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002132 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002133 p2 = f.tell()
2134 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002135 self.assertEqual(f.tell(), p0)
2136 self.assertEqual(f.readline(), "\xff\n")
2137 self.assertEqual(f.tell(), p1)
2138 self.assertEqual(f.readline(), "\xff\n")
2139 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002140 f.seek(0)
2141 for line in f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002142 self.assertEqual(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002143 self.assertRaises(IOError, f.tell)
Ezio Melotti2623a372010-11-21 13:34:58 +00002144 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002145 f.close()
2146
Antoine Pitrou19690592009-06-12 20:14:08 +00002147 def test_seeking(self):
2148 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00002149 prefix_size = chunk_size - 2
2150 u_prefix = "a" * prefix_size
2151 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti2623a372010-11-21 13:34:58 +00002152 self.assertEqual(len(u_prefix), len(prefix))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002153 u_suffix = "\u8888\n"
2154 suffix = bytes(u_suffix.encode("utf-8"))
2155 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00002156 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002157 f.write(line*2)
2158 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002159 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002160 s = f.read(prefix_size)
Ezio Melotti2623a372010-11-21 13:34:58 +00002161 self.assertEqual(s, prefix.decode("ascii"))
2162 self.assertEqual(f.tell(), prefix_size)
2163 self.assertEqual(f.readline(), u_suffix)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002164
Antoine Pitrou19690592009-06-12 20:14:08 +00002165 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002166 # Regression test for a specific bug
2167 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00002168 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002169 f.write(data)
2170 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002171 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002172 f._CHUNK_SIZE # Just test that it exists
2173 f._CHUNK_SIZE = 2
2174 f.readline()
2175 f.tell()
2176
Antoine Pitrou19690592009-06-12 20:14:08 +00002177 def test_seek_and_tell(self):
2178 #Test seek/tell using the StatefulIncrementalDecoder.
2179 # Make test faster by doing smaller seeks
2180 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00002181
Antoine Pitrou19690592009-06-12 20:14:08 +00002182 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002183 """Tell/seek to various points within a data stream and ensure
2184 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00002185 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002186 f.write(data)
2187 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002188 f = self.open(support.TESTFN, encoding='test_decoder')
2189 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00002190 decoded = f.read()
2191 f.close()
2192
2193 for i in range(min_pos, len(decoded) + 1): # seek positions
2194 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00002195 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti2623a372010-11-21 13:34:58 +00002196 self.assertEqual(f.read(i), decoded[:i])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002197 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002198 self.assertEqual(f.read(j), decoded[i:i + j])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002199 f.seek(cookie)
Ezio Melotti2623a372010-11-21 13:34:58 +00002200 self.assertEqual(f.read(), decoded[i:])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002201 f.close()
2202
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002203 # Enable the test decoder.
2204 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002205
2206 # Run the tests.
2207 try:
2208 # Try each test case.
2209 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002210 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002211
2212 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002213 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2214 offset = CHUNK_SIZE - len(input)//2
2215 prefix = b'.'*offset
2216 # Don't bother seeking into the prefix (takes too long).
2217 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002218 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002219
2220 # Ensure our test decoder won't interfere with subsequent tests.
2221 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002222 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002223
Antoine Pitrou19690592009-06-12 20:14:08 +00002224 def test_encoded_writes(self):
2225 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002226 tests = ("utf-16",
2227 "utf-16-le",
2228 "utf-16-be",
2229 "utf-32",
2230 "utf-32-le",
2231 "utf-32-be")
2232 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002233 buf = self.BytesIO()
2234 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002235 # Check if the BOM is written only once (see issue1753).
2236 f.write(data)
2237 f.write(data)
2238 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002239 self.assertEqual(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002240 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002241 self.assertEqual(f.read(), data * 2)
2242 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002243
Antoine Pitrou19690592009-06-12 20:14:08 +00002244 def test_unreadable(self):
2245 class UnReadable(self.BytesIO):
2246 def readable(self):
2247 return False
2248 txt = self.TextIOWrapper(UnReadable())
2249 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002250
Antoine Pitrou19690592009-06-12 20:14:08 +00002251 def test_read_one_by_one(self):
2252 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002253 reads = ""
2254 while True:
2255 c = txt.read(1)
2256 if not c:
2257 break
2258 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002259 self.assertEqual(reads, "AA\nBB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002260
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002261 def test_readlines(self):
2262 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2263 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2264 txt.seek(0)
2265 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2266 txt.seek(0)
2267 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2268
Christian Heimes1a6387e2008-03-26 12:49:49 +00002269 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002270 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002271 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002272 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002273 reads = ""
2274 while True:
2275 c = txt.read(128)
2276 if not c:
2277 break
2278 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002279 self.assertEqual(reads, "A"*127+"\nB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002280
Antoine Pitroueadca1d2012-10-16 23:02:27 +02002281 def test_writelines(self):
2282 l = ['ab', 'cd', 'ef']
2283 buf = self.BytesIO()
2284 txt = self.TextIOWrapper(buf)
2285 txt.writelines(l)
2286 txt.flush()
2287 self.assertEqual(buf.getvalue(), b'abcdef')
2288
2289 def test_writelines_userlist(self):
2290 l = UserList(['ab', 'cd', 'ef'])
2291 buf = self.BytesIO()
2292 txt = self.TextIOWrapper(buf)
2293 txt.writelines(l)
2294 txt.flush()
2295 self.assertEqual(buf.getvalue(), b'abcdef')
2296
2297 def test_writelines_error(self):
2298 txt = self.TextIOWrapper(self.BytesIO())
2299 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2300 self.assertRaises(TypeError, txt.writelines, None)
2301 self.assertRaises(TypeError, txt.writelines, b'abc')
2302
Christian Heimes1a6387e2008-03-26 12:49:49 +00002303 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002304 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002305
2306 # read one char at a time
2307 reads = ""
2308 while True:
2309 c = txt.read(1)
2310 if not c:
2311 break
2312 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002313 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002314
2315 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002316 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002317 txt._CHUNK_SIZE = 4
2318
2319 reads = ""
2320 while True:
2321 c = txt.read(4)
2322 if not c:
2323 break
2324 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002325 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002326
2327 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002328 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002329 txt._CHUNK_SIZE = 4
2330
2331 reads = txt.read(4)
2332 reads += txt.read(4)
2333 reads += txt.readline()
2334 reads += txt.readline()
2335 reads += txt.readline()
Ezio Melotti2623a372010-11-21 13:34:58 +00002336 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002337
2338 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002339 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002340 txt._CHUNK_SIZE = 4
2341
2342 reads = txt.read(4)
2343 reads += txt.read()
Ezio Melotti2623a372010-11-21 13:34:58 +00002344 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002345
2346 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002347 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002348 txt._CHUNK_SIZE = 4
2349
2350 reads = txt.read(4)
2351 pos = txt.tell()
2352 txt.seek(0)
2353 txt.seek(pos)
Ezio Melotti2623a372010-11-21 13:34:58 +00002354 self.assertEqual(txt.read(4), "BBB\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002355
2356 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002357 buffer = self.BytesIO(self.testdata)
2358 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002359
2360 self.assertEqual(buffer.seekable(), txt.seekable())
2361
Antoine Pitrou19690592009-06-12 20:14:08 +00002362 def test_append_bom(self):
2363 # The BOM is not written again when appending to a non-empty file
2364 filename = support.TESTFN
2365 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2366 with self.open(filename, 'w', encoding=charset) as f:
2367 f.write('aaa')
2368 pos = f.tell()
2369 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002370 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002371
2372 with self.open(filename, 'a', encoding=charset) as f:
2373 f.write('xxx')
2374 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002375 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002376
Antoine Pitrou19690592009-06-12 20:14:08 +00002377 def test_seek_bom(self):
2378 # Same test, but when seeking manually
2379 filename = support.TESTFN
2380 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2381 with self.open(filename, 'w', encoding=charset) as f:
2382 f.write('aaa')
2383 pos = f.tell()
2384 with self.open(filename, 'r+', encoding=charset) as f:
2385 f.seek(pos)
2386 f.write('zzz')
2387 f.seek(0)
2388 f.write('bbb')
2389 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002390 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002391
2392 def test_errors_property(self):
2393 with self.open(support.TESTFN, "w") as f:
2394 self.assertEqual(f.errors, "strict")
2395 with self.open(support.TESTFN, "w", errors="replace") as f:
2396 self.assertEqual(f.errors, "replace")
2397
Victor Stinner6a102812010-04-27 23:55:59 +00002398 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002399 def test_threads_write(self):
2400 # Issue6750: concurrent writes could duplicate data
2401 event = threading.Event()
2402 with self.open(support.TESTFN, "w", buffering=1) as f:
2403 def run(n):
2404 text = "Thread%03d\n" % n
2405 event.wait()
2406 f.write(text)
2407 threads = [threading.Thread(target=lambda n=x: run(n))
2408 for x in range(20)]
2409 for t in threads:
2410 t.start()
2411 time.sleep(0.02)
2412 event.set()
2413 for t in threads:
2414 t.join()
2415 with self.open(support.TESTFN) as f:
2416 content = f.read()
2417 for n in range(20):
Ezio Melotti2623a372010-11-21 13:34:58 +00002418 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002419
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002420 def test_flush_error_on_close(self):
2421 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2422 def bad_flush():
2423 raise IOError()
2424 txt.flush = bad_flush
2425 self.assertRaises(IOError, txt.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -06002426 self.assertTrue(txt.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002427
2428 def test_multi_close(self):
2429 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2430 txt.close()
2431 txt.close()
2432 txt.close()
2433 self.assertRaises(ValueError, txt.flush)
2434
Antoine Pitroufc9ead62010-12-21 21:26:55 +00002435 def test_readonly_attributes(self):
2436 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2437 buf = self.BytesIO(self.testdata)
2438 with self.assertRaises((AttributeError, TypeError)):
2439 txt.buffer = buf
2440
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002441 def test_read_nonbytes(self):
2442 # Issue #17106
2443 # Crash when underlying read() returns non-bytes
2444 class NonbytesStream(self.StringIO):
2445 read1 = self.StringIO.read
2446 class NonbytesStream(self.StringIO):
2447 read1 = self.StringIO.read
2448 t = self.TextIOWrapper(NonbytesStream('a'))
2449 with self.maybeRaises(TypeError):
2450 t.read(1)
2451 t = self.TextIOWrapper(NonbytesStream('a'))
2452 with self.maybeRaises(TypeError):
2453 t.readline()
2454 t = self.TextIOWrapper(NonbytesStream('a'))
2455 self.assertEqual(t.read(), u'a')
2456
2457 def test_illegal_decoder(self):
2458 # Issue #17106
2459 # Crash when decoder returns non-string
2460 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2461 encoding='quopri_codec')
2462 with self.maybeRaises(TypeError):
2463 t.read(1)
2464 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2465 encoding='quopri_codec')
2466 with self.maybeRaises(TypeError):
2467 t.readline()
2468 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2469 encoding='quopri_codec')
2470 with self.maybeRaises(TypeError):
2471 t.read()
2472
2473
Antoine Pitrou19690592009-06-12 20:14:08 +00002474class CTextIOWrapperTest(TextIOWrapperTest):
2475
2476 def test_initialization(self):
2477 r = self.BytesIO(b"\xc3\xa9\n\n")
2478 b = self.BufferedReader(r, 1000)
2479 t = self.TextIOWrapper(b)
2480 self.assertRaises(TypeError, t.__init__, b, newline=42)
2481 self.assertRaises(ValueError, t.read)
2482 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2483 self.assertRaises(ValueError, t.read)
2484
2485 def test_garbage_collection(self):
2486 # C TextIOWrapper objects are collected, and collecting them flushes
2487 # all data to disk.
2488 # The Python version has __del__, so it ends in gc.garbage instead.
2489 rawio = io.FileIO(support.TESTFN, "wb")
2490 b = self.BufferedWriter(rawio)
2491 t = self.TextIOWrapper(b, encoding="ascii")
2492 t.write("456def")
2493 t.x = t
2494 wr = weakref.ref(t)
2495 del t
2496 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002497 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002498 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002499 self.assertEqual(f.read(), b"456def")
2500
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002501 def test_rwpair_cleared_before_textio(self):
2502 # Issue 13070: TextIOWrapper's finalization would crash when called
2503 # after the reference to the underlying BufferedRWPair's writer got
2504 # cleared by the GC.
2505 for i in range(1000):
2506 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2507 t1 = self.TextIOWrapper(b1, encoding="ascii")
2508 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2509 t2 = self.TextIOWrapper(b2, encoding="ascii")
2510 # circular references
2511 t1.buddy = t2
2512 t2.buddy = t1
2513 support.gc_collect()
2514
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002515 maybeRaises = unittest.TestCase.assertRaises
2516
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002517
Antoine Pitrou19690592009-06-12 20:14:08 +00002518class PyTextIOWrapperTest(TextIOWrapperTest):
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002519 @contextlib.contextmanager
2520 def maybeRaises(self, *args, **kwds):
2521 yield
Antoine Pitrou19690592009-06-12 20:14:08 +00002522
2523
2524class IncrementalNewlineDecoderTest(unittest.TestCase):
2525
2526 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002527 # UTF-8 specific tests for a newline decoder
2528 def _check_decode(b, s, **kwargs):
2529 # We exercise getstate() / setstate() as well as decode()
2530 state = decoder.getstate()
Ezio Melotti2623a372010-11-21 13:34:58 +00002531 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002532 decoder.setstate(state)
Ezio Melotti2623a372010-11-21 13:34:58 +00002533 self.assertEqual(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002534
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002535 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002536
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002537 _check_decode(b'\xe8', "")
2538 _check_decode(b'\xa2', "")
2539 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002540
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002541 _check_decode(b'\xe8', "")
2542 _check_decode(b'\xa2', "")
2543 _check_decode(b'\x88', "\u8888")
2544
2545 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002546 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2547
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002548 decoder.reset()
2549 _check_decode(b'\n', "\n")
2550 _check_decode(b'\r', "")
2551 _check_decode(b'', "\n", final=True)
2552 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002553
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002554 _check_decode(b'\r', "")
2555 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002556
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002557 _check_decode(b'\r\r\n', "\n\n")
2558 _check_decode(b'\r', "")
2559 _check_decode(b'\r', "\n")
2560 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002561
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002562 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2563 _check_decode(b'\xe8\xa2\x88', "\u8888")
2564 _check_decode(b'\n', "\n")
2565 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2566 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002567
Antoine Pitrou19690592009-06-12 20:14:08 +00002568 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002569 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002570 if encoding is not None:
2571 encoder = codecs.getincrementalencoder(encoding)()
2572 def _decode_bytewise(s):
2573 # Decode one byte at a time
2574 for b in encoder.encode(s):
2575 result.append(decoder.decode(b))
2576 else:
2577 encoder = None
2578 def _decode_bytewise(s):
2579 # Decode one char at a time
2580 for c in s:
2581 result.append(decoder.decode(c))
Ezio Melotti2623a372010-11-21 13:34:58 +00002582 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002583 _decode_bytewise("abc\n\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002584 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002585 _decode_bytewise("\nabc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002586 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002587 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002588 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002589 _decode_bytewise("abc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002590 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002591 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002592 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002593 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002594 input = "abc"
2595 if encoder is not None:
2596 encoder.reset()
2597 input = encoder.encode(input)
Ezio Melotti2623a372010-11-21 13:34:58 +00002598 self.assertEqual(decoder.decode(input), "abc")
2599 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002600
2601 def test_newline_decoder(self):
2602 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002603 # None meaning the IncrementalNewlineDecoder takes unicode input
2604 # rather than bytes input
2605 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002606 'utf-16', 'utf-16-le', 'utf-16-be',
2607 'utf-32', 'utf-32-le', 'utf-32-be',
2608 )
2609 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002610 decoder = enc and codecs.getincrementaldecoder(enc)()
2611 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2612 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002613 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002614 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2615 self.check_newline_decoding_utf8(decoder)
2616
2617 def test_newline_bytes(self):
2618 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2619 def _check(dec):
Ezio Melotti2623a372010-11-21 13:34:58 +00002620 self.assertEqual(dec.newlines, None)
2621 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2622 self.assertEqual(dec.newlines, None)
2623 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2624 self.assertEqual(dec.newlines, None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002625 dec = self.IncrementalNewlineDecoder(None, translate=False)
2626 _check(dec)
2627 dec = self.IncrementalNewlineDecoder(None, translate=True)
2628 _check(dec)
2629
2630class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2631 pass
2632
2633class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2634 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002635
Christian Heimes1a6387e2008-03-26 12:49:49 +00002636
2637# XXX Tests for open()
2638
2639class MiscIOTest(unittest.TestCase):
2640
Benjamin Petersonad100c32008-11-20 22:06:22 +00002641 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002642 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002643
Antoine Pitrou19690592009-06-12 20:14:08 +00002644 def test___all__(self):
2645 for name in self.io.__all__:
2646 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002647 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002648 if name == "open":
2649 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002650 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002651 self.assertTrue(issubclass(obj, Exception), name)
2652 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002653 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002654
Benjamin Petersonad100c32008-11-20 22:06:22 +00002655 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002656 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002657 self.assertEqual(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002658 f.close()
2659
Antoine Pitrou19690592009-06-12 20:14:08 +00002660 f = self.open(support.TESTFN, "U")
Ezio Melotti2623a372010-11-21 13:34:58 +00002661 self.assertEqual(f.name, support.TESTFN)
2662 self.assertEqual(f.buffer.name, support.TESTFN)
2663 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2664 self.assertEqual(f.mode, "U")
2665 self.assertEqual(f.buffer.mode, "rb")
2666 self.assertEqual(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002667 f.close()
2668
Antoine Pitrou19690592009-06-12 20:14:08 +00002669 f = self.open(support.TESTFN, "w+")
Ezio Melotti2623a372010-11-21 13:34:58 +00002670 self.assertEqual(f.mode, "w+")
2671 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2672 self.assertEqual(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002673
Antoine Pitrou19690592009-06-12 20:14:08 +00002674 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti2623a372010-11-21 13:34:58 +00002675 self.assertEqual(g.mode, "wb")
2676 self.assertEqual(g.raw.mode, "wb")
2677 self.assertEqual(g.name, f.fileno())
2678 self.assertEqual(g.raw.name, f.fileno())
Benjamin Petersonad100c32008-11-20 22:06:22 +00002679 f.close()
2680 g.close()
2681
Antoine Pitrou19690592009-06-12 20:14:08 +00002682 def test_io_after_close(self):
2683 for kwargs in [
2684 {"mode": "w"},
2685 {"mode": "wb"},
2686 {"mode": "w", "buffering": 1},
2687 {"mode": "w", "buffering": 2},
2688 {"mode": "wb", "buffering": 0},
2689 {"mode": "r"},
2690 {"mode": "rb"},
2691 {"mode": "r", "buffering": 1},
2692 {"mode": "r", "buffering": 2},
2693 {"mode": "rb", "buffering": 0},
2694 {"mode": "w+"},
2695 {"mode": "w+b"},
2696 {"mode": "w+", "buffering": 1},
2697 {"mode": "w+", "buffering": 2},
2698 {"mode": "w+b", "buffering": 0},
2699 ]:
2700 f = self.open(support.TESTFN, **kwargs)
2701 f.close()
2702 self.assertRaises(ValueError, f.flush)
2703 self.assertRaises(ValueError, f.fileno)
2704 self.assertRaises(ValueError, f.isatty)
2705 self.assertRaises(ValueError, f.__iter__)
2706 if hasattr(f, "peek"):
2707 self.assertRaises(ValueError, f.peek, 1)
2708 self.assertRaises(ValueError, f.read)
2709 if hasattr(f, "read1"):
2710 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinner5100a402011-05-25 22:15:36 +02002711 if hasattr(f, "readall"):
2712 self.assertRaises(ValueError, f.readall)
Antoine Pitrou19690592009-06-12 20:14:08 +00002713 if hasattr(f, "readinto"):
2714 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2715 self.assertRaises(ValueError, f.readline)
2716 self.assertRaises(ValueError, f.readlines)
2717 self.assertRaises(ValueError, f.seek, 0)
2718 self.assertRaises(ValueError, f.tell)
2719 self.assertRaises(ValueError, f.truncate)
2720 self.assertRaises(ValueError, f.write,
2721 b"" if "b" in kwargs['mode'] else "")
2722 self.assertRaises(ValueError, f.writelines, [])
2723 self.assertRaises(ValueError, next, f)
2724
2725 def test_blockingioerror(self):
2726 # Various BlockingIOError issues
2727 self.assertRaises(TypeError, self.BlockingIOError)
2728 self.assertRaises(TypeError, self.BlockingIOError, 1)
2729 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2730 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2731 b = self.BlockingIOError(1, "")
2732 self.assertEqual(b.characters_written, 0)
2733 class C(unicode):
2734 pass
2735 c = C("")
2736 b = self.BlockingIOError(1, c)
2737 c.b = b
2738 b.c = c
2739 wr = weakref.ref(c)
2740 del c, b
2741 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002742 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002743
2744 def test_abcs(self):
2745 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002746 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2747 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2748 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2749 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002750
2751 def _check_abc_inheritance(self, abcmodule):
2752 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002753 self.assertIsInstance(f, abcmodule.IOBase)
2754 self.assertIsInstance(f, abcmodule.RawIOBase)
2755 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2756 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002757 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002758 self.assertIsInstance(f, abcmodule.IOBase)
2759 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2760 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2761 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002762 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002763 self.assertIsInstance(f, abcmodule.IOBase)
2764 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2765 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2766 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002767
2768 def test_abc_inheritance(self):
2769 # Test implementations inherit from their respective ABCs
2770 self._check_abc_inheritance(self)
2771
2772 def test_abc_inheritance_official(self):
2773 # Test implementations inherit from the official ABCs of the
2774 # baseline "io" module.
2775 self._check_abc_inheritance(io)
2776
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01002777 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2778 def test_nonblock_pipe_write_bigbuf(self):
2779 self._test_nonblock_pipe_write(16*1024)
2780
2781 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2782 def test_nonblock_pipe_write_smallbuf(self):
2783 self._test_nonblock_pipe_write(1024)
2784
2785 def _set_non_blocking(self, fd):
2786 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2787 self.assertNotEqual(flags, -1)
2788 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2789 self.assertEqual(res, 0)
2790
2791 def _test_nonblock_pipe_write(self, bufsize):
2792 sent = []
2793 received = []
2794 r, w = os.pipe()
2795 self._set_non_blocking(r)
2796 self._set_non_blocking(w)
2797
2798 # To exercise all code paths in the C implementation we need
2799 # to play with buffer sizes. For instance, if we choose a
2800 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2801 # then we will never get a partial write of the buffer.
2802 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2803 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2804
2805 with rf, wf:
2806 for N in 9999, 73, 7574:
2807 try:
2808 i = 0
2809 while True:
2810 msg = bytes([i % 26 + 97]) * N
2811 sent.append(msg)
2812 wf.write(msg)
2813 i += 1
2814
2815 except self.BlockingIOError as e:
2816 self.assertEqual(e.args[0], errno.EAGAIN)
2817 sent[-1] = sent[-1][:e.characters_written]
2818 received.append(rf.read())
2819 msg = b'BLOCKED'
2820 wf.write(msg)
2821 sent.append(msg)
2822
2823 while True:
2824 try:
2825 wf.flush()
2826 break
2827 except self.BlockingIOError as e:
2828 self.assertEqual(e.args[0], errno.EAGAIN)
2829 self.assertEqual(e.characters_written, 0)
2830 received.append(rf.read())
2831
2832 received += iter(rf.read, None)
2833
2834 sent, received = b''.join(sent), b''.join(received)
2835 self.assertTrue(sent == received)
2836 self.assertTrue(wf.closed)
2837 self.assertTrue(rf.closed)
2838
Antoine Pitrou19690592009-06-12 20:14:08 +00002839class CMiscIOTest(MiscIOTest):
2840 io = io
2841
2842class PyMiscIOTest(MiscIOTest):
2843 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002844
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002845
2846@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2847class SignalsTest(unittest.TestCase):
2848
2849 def setUp(self):
2850 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2851
2852 def tearDown(self):
2853 signal.signal(signal.SIGALRM, self.oldalrm)
2854
2855 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00002856 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002857
2858 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinner49d495f2011-07-04 11:44:46 +02002859 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
2860 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002861 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2862 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou6439c002011-02-25 21:35:47 +00002863 invokes the signal handler, and bubbles up the exception raised
2864 in the latter."""
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002865 read_results = []
2866 def _read():
2867 s = os.read(r, 1)
2868 read_results.append(s)
2869 t = threading.Thread(target=_read)
2870 t.daemon = True
2871 r, w = os.pipe()
2872 try:
2873 wio = self.io.open(w, **fdopen_kwargs)
2874 t.start()
2875 signal.alarm(1)
2876 # Fill the pipe enough that the write will be blocking.
2877 # It will be interrupted by the timer armed above. Since the
2878 # other thread has read one byte, the low-level write will
2879 # return with a successful (partial) result rather than an EINTR.
2880 # The buffered IO layer must check for pending signal
2881 # handlers, which in this case will invoke alarm_interrupt().
2882 self.assertRaises(ZeroDivisionError,
2883 wio.write, item * (1024 * 1024))
2884 t.join()
2885 # We got one byte, get another one and check that it isn't a
2886 # repeat of the first one.
2887 read_results.append(os.read(r, 1))
2888 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2889 finally:
2890 os.close(w)
2891 os.close(r)
2892 # This is deliberate. If we didn't close the file descriptor
2893 # before closing wio, wio would try to flush its internal
2894 # buffer, and block again.
2895 try:
2896 wio.close()
2897 except IOError as e:
2898 if e.errno != errno.EBADF:
2899 raise
2900
2901 def test_interrupted_write_unbuffered(self):
2902 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2903
2904 def test_interrupted_write_buffered(self):
2905 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2906
2907 def test_interrupted_write_text(self):
2908 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2909
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002910 def check_reentrant_write(self, data, **fdopen_kwargs):
2911 def on_alarm(*args):
2912 # Will be called reentrantly from the same thread
2913 wio.write(data)
Victor Stinner4c41f842011-07-05 13:29:26 +02002914 1//0
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002915 signal.signal(signal.SIGALRM, on_alarm)
2916 r, w = os.pipe()
2917 wio = self.io.open(w, **fdopen_kwargs)
2918 try:
2919 signal.alarm(1)
2920 # Either the reentrant call to wio.write() fails with RuntimeError,
2921 # or the signal handler raises ZeroDivisionError.
2922 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2923 while 1:
2924 for i in range(100):
2925 wio.write(data)
2926 wio.flush()
2927 # Make sure the buffer doesn't fill up and block further writes
2928 os.read(r, len(data) * 100)
2929 exc = cm.exception
2930 if isinstance(exc, RuntimeError):
2931 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2932 finally:
2933 wio.close()
2934 os.close(r)
2935
2936 def test_reentrant_write_buffered(self):
2937 self.check_reentrant_write(b"xy", mode="wb")
2938
2939 def test_reentrant_write_text(self):
2940 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2941
Antoine Pitrou6439c002011-02-25 21:35:47 +00002942 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2943 """Check that a buffered read, when it gets interrupted (either
2944 returning a partial result or EINTR), properly invokes the signal
2945 handler and retries if the latter returned successfully."""
2946 r, w = os.pipe()
2947 fdopen_kwargs["closefd"] = False
2948 def alarm_handler(sig, frame):
2949 os.write(w, b"bar")
2950 signal.signal(signal.SIGALRM, alarm_handler)
2951 try:
2952 rio = self.io.open(r, **fdopen_kwargs)
2953 os.write(w, b"foo")
2954 signal.alarm(1)
2955 # Expected behaviour:
2956 # - first raw read() returns partial b"foo"
2957 # - second raw read() returns EINTR
2958 # - third raw read() returns b"bar"
2959 self.assertEqual(decode(rio.read(6)), "foobar")
2960 finally:
2961 rio.close()
2962 os.close(w)
2963 os.close(r)
2964
2965 def test_interrupterd_read_retry_buffered(self):
2966 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2967 mode="rb")
2968
2969 def test_interrupterd_read_retry_text(self):
2970 self.check_interrupted_read_retry(lambda x: x,
2971 mode="r")
2972
2973 @unittest.skipUnless(threading, 'Threading required for this test.')
2974 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2975 """Check that a buffered write, when it gets interrupted (either
2976 returning a partial result or EINTR), properly invokes the signal
2977 handler and retries if the latter returned successfully."""
2978 select = support.import_module("select")
2979 # A quantity that exceeds the buffer size of an anonymous pipe's
2980 # write end.
2981 N = 1024 * 1024
2982 r, w = os.pipe()
2983 fdopen_kwargs["closefd"] = False
2984 # We need a separate thread to read from the pipe and allow the
2985 # write() to finish. This thread is started after the SIGALRM is
2986 # received (forcing a first EINTR in write()).
2987 read_results = []
2988 write_finished = False
2989 def _read():
2990 while not write_finished:
2991 while r in select.select([r], [], [], 1.0)[0]:
2992 s = os.read(r, 1024)
2993 read_results.append(s)
2994 t = threading.Thread(target=_read)
2995 t.daemon = True
2996 def alarm1(sig, frame):
2997 signal.signal(signal.SIGALRM, alarm2)
2998 signal.alarm(1)
2999 def alarm2(sig, frame):
3000 t.start()
3001 signal.signal(signal.SIGALRM, alarm1)
3002 try:
3003 wio = self.io.open(w, **fdopen_kwargs)
3004 signal.alarm(1)
3005 # Expected behaviour:
3006 # - first raw write() is partial (because of the limited pipe buffer
3007 # and the first alarm)
3008 # - second raw write() returns EINTR (because of the second alarm)
3009 # - subsequent write()s are successful (either partial or complete)
3010 self.assertEqual(N, wio.write(item * N))
3011 wio.flush()
3012 write_finished = True
3013 t.join()
3014 self.assertEqual(N, sum(len(x) for x in read_results))
3015 finally:
3016 write_finished = True
3017 os.close(w)
3018 os.close(r)
3019 # This is deliberate. If we didn't close the file descriptor
3020 # before closing wio, wio would try to flush its internal
3021 # buffer, and could block (in case of failure).
3022 try:
3023 wio.close()
3024 except IOError as e:
3025 if e.errno != errno.EBADF:
3026 raise
3027
3028 def test_interrupterd_write_retry_buffered(self):
3029 self.check_interrupted_write_retry(b"x", mode="wb")
3030
3031 def test_interrupterd_write_retry_text(self):
3032 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3033
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003034
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003035class CSignalsTest(SignalsTest):
3036 io = io
3037
3038class PySignalsTest(SignalsTest):
3039 io = pyio
3040
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003041 # Handling reentrancy issues would slow down _pyio even more, so the
3042 # tests are disabled.
3043 test_reentrant_write_buffered = None
3044 test_reentrant_write_text = None
3045
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003046
Christian Heimes1a6387e2008-03-26 12:49:49 +00003047def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00003048 tests = (CIOTest, PyIOTest,
3049 CBufferedReaderTest, PyBufferedReaderTest,
3050 CBufferedWriterTest, PyBufferedWriterTest,
3051 CBufferedRWPairTest, PyBufferedRWPairTest,
3052 CBufferedRandomTest, PyBufferedRandomTest,
3053 StatefulIncrementalDecoderTest,
3054 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3055 CTextIOWrapperTest, PyTextIOWrapperTest,
3056 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003057 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00003058 )
3059
3060 # Put the namespaces of the IO module we are testing and some useful mock
3061 # classes in the __dict__ of each test.
3062 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00003063 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00003064 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3065 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
3066 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
3067 globs = globals()
3068 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3069 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3070 # Avoid turning open into a bound method.
3071 py_io_ns["open"] = pyio.OpenWrapper
3072 for test in tests:
3073 if test.__name__.startswith("C"):
3074 for name, obj in c_io_ns.items():
3075 setattr(test, name, obj)
3076 elif test.__name__.startswith("Py"):
3077 for name, obj in py_io_ns.items():
3078 setattr(test, name, obj)
3079
3080 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00003081
3082if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00003083 test_main()