blob: aabc1d23dd63178f2c3de5f374586c22ea8ba1a6 [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Antoine Pitrou19690592009-06-12 20:14:08 +000032import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000033import signal
34import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000035from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000036from collections import deque
Antoine Pitrou78e761e2012-10-16 22:57:11 +020037from UserList import UserList
Antoine Pitrou19690592009-06-12 20:14:08 +000038from test import test_support as support
Serhiy Storchaka354d50e2013-02-03 17:10:42 +020039import contextlib
Christian Heimes1a6387e2008-03-26 12:49:49 +000040
41import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000042import io # C implementation of io
43import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000044try:
45 import threading
46except ImportError:
47 threading = None
Antoine Pitrou5aa7df32011-11-21 20:16:44 +010048try:
49 import fcntl
50except ImportError:
51 fcntl = None
Antoine Pitrou19690592009-06-12 20:14:08 +000052
53__metaclass__ = type
54bytes = support.py3k_bytes
55
56def _default_chunk_size():
57 """Get the default TextIOWrapper chunk size"""
58 with io.open(__file__, "r", encoding="latin1") as f:
59 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000060
61
Antoine Pitrou6391b342010-09-14 18:48:19 +000062class MockRawIOWithoutRead:
63 """A RawIO implementation without read(), so as to exercise the default
64 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000065
66 def __init__(self, read_stack=()):
67 self._read_stack = list(read_stack)
68 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000069 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000070 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000071
Christian Heimes1a6387e2008-03-26 12:49:49 +000072 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000073 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000074 return len(b)
75
76 def writable(self):
77 return True
78
79 def fileno(self):
80 return 42
81
82 def readable(self):
83 return True
84
85 def seekable(self):
86 return True
87
88 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000089 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000090
91 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000092 return 0 # same comment as above
93
94 def readinto(self, buf):
95 self._reads += 1
96 max_len = len(buf)
97 try:
98 data = self._read_stack[0]
99 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000100 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +0000101 return 0
102 if data is None:
103 del self._read_stack[0]
104 return None
105 n = len(data)
106 if len(data) <= max_len:
107 del self._read_stack[0]
108 buf[:n] = data
109 return n
110 else:
111 buf[:] = data[:max_len]
112 self._read_stack[0] = data[max_len:]
113 return max_len
114
115 def truncate(self, pos=None):
116 return pos
117
Antoine Pitrou6391b342010-09-14 18:48:19 +0000118class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
119 pass
120
121class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
122 pass
123
124
125class MockRawIO(MockRawIOWithoutRead):
126
127 def read(self, n=None):
128 self._reads += 1
129 try:
130 return self._read_stack.pop(0)
131 except:
132 self._extraneous_reads += 1
133 return b""
134
Antoine Pitrou19690592009-06-12 20:14:08 +0000135class CMockRawIO(MockRawIO, io.RawIOBase):
136 pass
137
138class PyMockRawIO(MockRawIO, pyio.RawIOBase):
139 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000140
141
Antoine Pitrou19690592009-06-12 20:14:08 +0000142class MisbehavedRawIO(MockRawIO):
143 def write(self, b):
144 return MockRawIO.write(self, b) * 2
145
146 def read(self, n=None):
147 return MockRawIO.read(self, n) * 2
148
149 def seek(self, pos, whence):
150 return -123
151
152 def tell(self):
153 return -456
154
155 def readinto(self, buf):
156 MockRawIO.readinto(self, buf)
157 return len(buf) * 5
158
159class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
160 pass
161
162class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
163 pass
164
165
166class CloseFailureIO(MockRawIO):
167 closed = 0
168
169 def close(self):
170 if not self.closed:
171 self.closed = 1
172 raise IOError
173
174class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
175 pass
176
177class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
178 pass
179
180
181class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000182
183 def __init__(self, data):
184 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000185 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000186
187 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000188 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000189 self.read_history.append(None if res is None else len(res))
190 return res
191
Antoine Pitrou19690592009-06-12 20:14:08 +0000192 def readinto(self, b):
193 res = super(MockFileIO, self).readinto(b)
194 self.read_history.append(res)
195 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000196
Antoine Pitrou19690592009-06-12 20:14:08 +0000197class CMockFileIO(MockFileIO, io.BytesIO):
198 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000199
Antoine Pitrou19690592009-06-12 20:14:08 +0000200class PyMockFileIO(MockFileIO, pyio.BytesIO):
201 pass
202
203
204class MockNonBlockWriterIO:
205
206 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000207 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000208 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000209
Antoine Pitrou19690592009-06-12 20:14:08 +0000210 def pop_written(self):
211 s = b"".join(self._write_stack)
212 self._write_stack[:] = []
213 return s
214
215 def block_on(self, char):
216 """Block when a given char is encountered."""
217 self._blocker_char = char
218
219 def readable(self):
220 return True
221
222 def seekable(self):
223 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000224
225 def writable(self):
226 return True
227
Antoine Pitrou19690592009-06-12 20:14:08 +0000228 def write(self, b):
229 b = bytes(b)
230 n = -1
231 if self._blocker_char:
232 try:
233 n = b.index(self._blocker_char)
234 except ValueError:
235 pass
236 else:
Antoine Pitrou5aa7df32011-11-21 20:16:44 +0100237 if n > 0:
238 # write data up to the first blocker
239 self._write_stack.append(b[:n])
240 return n
241 else:
242 # cancel blocker and indicate would block
243 self._blocker_char = None
244 return None
Antoine Pitrou19690592009-06-12 20:14:08 +0000245 self._write_stack.append(b)
246 return len(b)
247
248class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
249 BlockingIOError = io.BlockingIOError
250
251class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
252 BlockingIOError = pyio.BlockingIOError
253
Christian Heimes1a6387e2008-03-26 12:49:49 +0000254
255class IOTest(unittest.TestCase):
256
Antoine Pitrou19690592009-06-12 20:14:08 +0000257 def setUp(self):
258 support.unlink(support.TESTFN)
259
Christian Heimes1a6387e2008-03-26 12:49:49 +0000260 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000261 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000262
263 def write_ops(self, f):
264 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000265 f.truncate(0)
266 self.assertEqual(f.tell(), 5)
267 f.seek(0)
268
269 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000270 self.assertEqual(f.seek(0), 0)
271 self.assertEqual(f.write(b"Hello."), 6)
272 self.assertEqual(f.tell(), 6)
273 self.assertEqual(f.seek(-1, 1), 5)
274 self.assertEqual(f.tell(), 5)
275 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
276 self.assertEqual(f.seek(0), 0)
277 self.assertEqual(f.write(b"h"), 1)
278 self.assertEqual(f.seek(-1, 2), 13)
279 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000280
Christian Heimes1a6387e2008-03-26 12:49:49 +0000281 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000282 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000283 self.assertRaises(TypeError, f.seek, 0.0)
284
285 def read_ops(self, f, buffered=False):
286 data = f.read(5)
287 self.assertEqual(data, b"hello")
288 data = bytearray(data)
289 self.assertEqual(f.readinto(data), 5)
290 self.assertEqual(data, b" worl")
291 self.assertEqual(f.readinto(data), 2)
292 self.assertEqual(len(data), 5)
293 self.assertEqual(data[:2], b"d\n")
294 self.assertEqual(f.seek(0), 0)
295 self.assertEqual(f.read(20), b"hello world\n")
296 self.assertEqual(f.read(1), b"")
297 self.assertEqual(f.readinto(bytearray(b"x")), 0)
298 self.assertEqual(f.seek(-6, 2), 6)
299 self.assertEqual(f.read(5), b"world")
300 self.assertEqual(f.read(0), b"")
301 self.assertEqual(f.readinto(bytearray()), 0)
302 self.assertEqual(f.seek(-6, 1), 5)
303 self.assertEqual(f.read(5), b" worl")
304 self.assertEqual(f.tell(), 10)
305 self.assertRaises(TypeError, f.seek, 0.0)
306 if buffered:
307 f.seek(0)
308 self.assertEqual(f.read(), b"hello world\n")
309 f.seek(6)
310 self.assertEqual(f.read(), b"world\n")
311 self.assertEqual(f.read(), b"")
312
313 LARGE = 2**31
314
315 def large_file_ops(self, f):
316 assert f.readable()
317 assert f.writable()
318 self.assertEqual(f.seek(self.LARGE), self.LARGE)
319 self.assertEqual(f.tell(), self.LARGE)
320 self.assertEqual(f.write(b"xxx"), 3)
321 self.assertEqual(f.tell(), self.LARGE + 3)
322 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
323 self.assertEqual(f.truncate(), self.LARGE + 2)
324 self.assertEqual(f.tell(), self.LARGE + 2)
325 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
326 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000327 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000328 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
329 self.assertEqual(f.seek(-1, 2), self.LARGE)
330 self.assertEqual(f.read(2), b"x")
331
Antoine Pitrou19690592009-06-12 20:14:08 +0000332 def test_invalid_operations(self):
333 # Try writing on a file opened in read mode and vice-versa.
334 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000335 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000336 self.assertRaises(IOError, fp.read)
337 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000338 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000339 self.assertRaises(IOError, fp.write, b"blah")
340 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000341 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000342 self.assertRaises(IOError, fp.write, "blah")
343 self.assertRaises(IOError, fp.writelines, ["blah\n"])
344
Christian Heimes1a6387e2008-03-26 12:49:49 +0000345 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000346 with self.open(support.TESTFN, "wb", buffering=0) as f:
347 self.assertEqual(f.readable(), False)
348 self.assertEqual(f.writable(), True)
349 self.assertEqual(f.seekable(), True)
350 self.write_ops(f)
351 with self.open(support.TESTFN, "rb", buffering=0) as f:
352 self.assertEqual(f.readable(), True)
353 self.assertEqual(f.writable(), False)
354 self.assertEqual(f.seekable(), True)
355 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000356
357 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000358 with self.open(support.TESTFN, "wb") as f:
359 self.assertEqual(f.readable(), False)
360 self.assertEqual(f.writable(), True)
361 self.assertEqual(f.seekable(), True)
362 self.write_ops(f)
363 with self.open(support.TESTFN, "rb") as f:
364 self.assertEqual(f.readable(), True)
365 self.assertEqual(f.writable(), False)
366 self.assertEqual(f.seekable(), True)
367 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000368
369 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000370 with self.open(support.TESTFN, "wb") as f:
371 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
372 with self.open(support.TESTFN, "rb") as f:
373 self.assertEqual(f.readline(), b"abc\n")
374 self.assertEqual(f.readline(10), b"def\n")
375 self.assertEqual(f.readline(2), b"xy")
376 self.assertEqual(f.readline(4), b"zzy\n")
377 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000378 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000379 self.assertRaises(TypeError, f.readline, 5.3)
380 with self.open(support.TESTFN, "r") as f:
381 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000382
383 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000384 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000385 self.write_ops(f)
386 data = f.getvalue()
387 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000388 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000389 self.read_ops(f, True)
390
391 def test_large_file_ops(self):
392 # On Windows and Mac OSX this test comsumes large resources; It takes
393 # a long time to build the >2GB file and takes >2GB of disk space
394 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000395 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
396 if not support.is_resource_enabled("largefile"):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000397 print("\nTesting large file ops skipped on %s." % sys.platform,
398 file=sys.stderr)
399 print("It requires %d bytes and a long time." % self.LARGE,
400 file=sys.stderr)
401 print("Use 'regrtest.py -u largefile test_io' to run it.",
402 file=sys.stderr)
403 return
Antoine Pitrou19690592009-06-12 20:14:08 +0000404 with self.open(support.TESTFN, "w+b", 0) as f:
405 self.large_file_ops(f)
406 with self.open(support.TESTFN, "w+b") as f:
407 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000408
409 def test_with_open(self):
410 for bufsize in (0, 1, 100):
411 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000412 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000413 f.write(b"xxx")
414 self.assertEqual(f.closed, True)
415 f = None
416 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000417 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000418 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000419 except ZeroDivisionError:
420 self.assertEqual(f.closed, True)
421 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000422 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000423
Antoine Pitroue741cc62009-01-21 00:45:36 +0000424 # issue 5008
425 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000426 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000427 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000428 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000429 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000430 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000431 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000432 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000433 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000434
Christian Heimes1a6387e2008-03-26 12:49:49 +0000435 def test_destructor(self):
436 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000437 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000438 def __del__(self):
439 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000440 try:
441 f = super(MyFileIO, self).__del__
442 except AttributeError:
443 pass
444 else:
445 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000446 def close(self):
447 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000448 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000449 def flush(self):
450 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000451 super(MyFileIO, self).flush()
452 f = MyFileIO(support.TESTFN, "wb")
453 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000454 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000455 support.gc_collect()
456 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000457 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000458 self.assertEqual(f.read(), b"xxx")
459
460 def _check_base_destructor(self, base):
461 record = []
462 class MyIO(base):
463 def __init__(self):
464 # This exercises the availability of attributes on object
465 # destruction.
466 # (in the C version, close() is called by the tp_dealloc
467 # function, not by __del__)
468 self.on_del = 1
469 self.on_close = 2
470 self.on_flush = 3
471 def __del__(self):
472 record.append(self.on_del)
473 try:
474 f = super(MyIO, self).__del__
475 except AttributeError:
476 pass
477 else:
478 f()
479 def close(self):
480 record.append(self.on_close)
481 super(MyIO, self).close()
482 def flush(self):
483 record.append(self.on_flush)
484 super(MyIO, self).flush()
485 f = MyIO()
486 del f
487 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000488 self.assertEqual(record, [1, 2, 3])
489
Antoine Pitrou19690592009-06-12 20:14:08 +0000490 def test_IOBase_destructor(self):
491 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000492
Antoine Pitrou19690592009-06-12 20:14:08 +0000493 def test_RawIOBase_destructor(self):
494 self._check_base_destructor(self.RawIOBase)
495
496 def test_BufferedIOBase_destructor(self):
497 self._check_base_destructor(self.BufferedIOBase)
498
499 def test_TextIOBase_destructor(self):
500 self._check_base_destructor(self.TextIOBase)
501
502 def test_close_flushes(self):
503 with self.open(support.TESTFN, "wb") as f:
504 f.write(b"xxx")
505 with self.open(support.TESTFN, "rb") as f:
506 self.assertEqual(f.read(), b"xxx")
507
508 def test_array_writes(self):
509 a = array.array(b'i', range(10))
510 n = len(a.tostring())
511 with self.open(support.TESTFN, "wb", 0) as f:
512 self.assertEqual(f.write(a), n)
513 with self.open(support.TESTFN, "wb") as f:
514 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000515
516 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000517 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000518 closefd=False)
519
Antoine Pitrou19690592009-06-12 20:14:08 +0000520 def test_read_closed(self):
521 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000522 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000523 with self.open(support.TESTFN, "r") as f:
524 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000525 self.assertEqual(file.read(), "egg\n")
526 file.seek(0)
527 file.close()
528 self.assertRaises(ValueError, file.read)
529
530 def test_no_closefd_with_filename(self):
531 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000532 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000533
534 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000535 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000536 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000537 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000538 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000539 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000540 self.assertEqual(file.buffer.raw.closefd, False)
541
Antoine Pitrou19690592009-06-12 20:14:08 +0000542 def test_garbage_collection(self):
543 # FileIO objects are collected, and collecting them flushes
544 # all data to disk.
545 f = self.FileIO(support.TESTFN, "wb")
546 f.write(b"abcxxx")
547 f.f = f
548 wr = weakref.ref(f)
549 del f
550 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000551 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000552 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000553 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000554
Antoine Pitrou19690592009-06-12 20:14:08 +0000555 def test_unbounded_file(self):
556 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
557 zero = "/dev/zero"
558 if not os.path.exists(zero):
559 self.skipTest("{0} does not exist".format(zero))
560 if sys.maxsize > 0x7FFFFFFF:
561 self.skipTest("test can only run in a 32-bit address space")
562 if support.real_max_memuse < support._2G:
563 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000564 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000565 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000566 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000567 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000568 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000569 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000570
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000571 def test_flush_error_on_close(self):
572 f = self.open(support.TESTFN, "wb", buffering=0)
573 def bad_flush():
574 raise IOError()
575 f.flush = bad_flush
576 self.assertRaises(IOError, f.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600577 self.assertTrue(f.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000578
579 def test_multi_close(self):
580 f = self.open(support.TESTFN, "wb", buffering=0)
581 f.close()
582 f.close()
583 f.close()
584 self.assertRaises(ValueError, f.flush)
585
Antoine Pitrou6391b342010-09-14 18:48:19 +0000586 def test_RawIOBase_read(self):
587 # Exercise the default RawIOBase.read() implementation (which calls
588 # readinto() internally).
589 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
590 self.assertEqual(rawio.read(2), b"ab")
591 self.assertEqual(rawio.read(2), b"c")
592 self.assertEqual(rawio.read(2), b"d")
593 self.assertEqual(rawio.read(2), None)
594 self.assertEqual(rawio.read(2), b"ef")
595 self.assertEqual(rawio.read(2), b"g")
596 self.assertEqual(rawio.read(2), None)
597 self.assertEqual(rawio.read(2), b"")
598
Hynek Schlawack877effc2012-05-25 09:24:18 +0200599 def test_fileio_closefd(self):
600 # Issue #4841
601 with self.open(__file__, 'rb') as f1, \
602 self.open(__file__, 'rb') as f2:
603 fileio = self.FileIO(f1.fileno(), closefd=False)
604 # .__init__() must not close f1
605 fileio.__init__(f2.fileno(), closefd=False)
606 f1.readline()
607 # .close() must not close f2
608 fileio.close()
609 f2.readline()
610
611
Antoine Pitrou19690592009-06-12 20:14:08 +0000612class CIOTest(IOTest):
Antoine Pitrou16166452011-07-12 22:04:20 +0200613
614 def test_IOBase_finalize(self):
615 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
616 # class which inherits IOBase and an object of this class are caught
617 # in a reference cycle and close() is already in the method cache.
618 class MyIO(self.IOBase):
619 def close(self):
620 pass
621
622 # create an instance to populate the method cache
623 MyIO()
624 obj = MyIO()
625 obj.obj = obj
626 wr = weakref.ref(obj)
627 del MyIO
628 del obj
629 support.gc_collect()
630 self.assertTrue(wr() is None, wr)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000631
Antoine Pitrou19690592009-06-12 20:14:08 +0000632class PyIOTest(IOTest):
633 test_array_writes = unittest.skip(
634 "len(array.array) returns number of elements rather than bytelength"
635 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000636
637
Antoine Pitrou19690592009-06-12 20:14:08 +0000638class CommonBufferedTests:
639 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
640
641 def test_detach(self):
642 raw = self.MockRawIO()
643 buf = self.tp(raw)
644 self.assertIs(buf.detach(), raw)
645 self.assertRaises(ValueError, buf.detach)
646
647 def test_fileno(self):
648 rawio = self.MockRawIO()
649 bufio = self.tp(rawio)
650
Ezio Melotti2623a372010-11-21 13:34:58 +0000651 self.assertEqual(42, bufio.fileno())
Antoine Pitrou19690592009-06-12 20:14:08 +0000652
653 def test_no_fileno(self):
654 # XXX will we always have fileno() function? If so, kill
655 # this test. Else, write it.
656 pass
657
658 def test_invalid_args(self):
659 rawio = self.MockRawIO()
660 bufio = self.tp(rawio)
661 # Invalid whence
662 self.assertRaises(ValueError, bufio.seek, 0, -1)
663 self.assertRaises(ValueError, bufio.seek, 0, 3)
664
665 def test_override_destructor(self):
666 tp = self.tp
667 record = []
668 class MyBufferedIO(tp):
669 def __del__(self):
670 record.append(1)
671 try:
672 f = super(MyBufferedIO, self).__del__
673 except AttributeError:
674 pass
675 else:
676 f()
677 def close(self):
678 record.append(2)
679 super(MyBufferedIO, self).close()
680 def flush(self):
681 record.append(3)
682 super(MyBufferedIO, self).flush()
683 rawio = self.MockRawIO()
684 bufio = MyBufferedIO(rawio)
685 writable = bufio.writable()
686 del bufio
687 support.gc_collect()
688 if writable:
689 self.assertEqual(record, [1, 2, 3])
690 else:
691 self.assertEqual(record, [1, 2])
692
693 def test_context_manager(self):
694 # Test usability as a context manager
695 rawio = self.MockRawIO()
696 bufio = self.tp(rawio)
697 def _with():
698 with bufio:
699 pass
700 _with()
701 # bufio should now be closed, and using it a second time should raise
702 # a ValueError.
703 self.assertRaises(ValueError, _with)
704
705 def test_error_through_destructor(self):
706 # Test that the exception state is not modified by a destructor,
707 # even if close() fails.
708 rawio = self.CloseFailureIO()
709 def f():
710 self.tp(rawio).xyzzy
711 with support.captured_output("stderr") as s:
712 self.assertRaises(AttributeError, f)
713 s = s.getvalue().strip()
714 if s:
715 # The destructor *may* have printed an unraisable error, check it
716 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000717 self.assertTrue(s.startswith("Exception IOError: "), s)
718 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000719
720 def test_repr(self):
721 raw = self.MockRawIO()
722 b = self.tp(raw)
723 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
724 self.assertEqual(repr(b), "<%s>" % clsname)
725 raw.name = "dummy"
726 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
727 raw.name = b"dummy"
728 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000729
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000730 def test_flush_error_on_close(self):
731 raw = self.MockRawIO()
732 def bad_flush():
733 raise IOError()
734 raw.flush = bad_flush
735 b = self.tp(raw)
736 self.assertRaises(IOError, b.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600737 self.assertTrue(b.closed)
738
739 def test_close_error_on_close(self):
740 raw = self.MockRawIO()
741 def bad_flush():
742 raise IOError('flush')
743 def bad_close():
744 raise IOError('close')
745 raw.close = bad_close
746 b = self.tp(raw)
747 b.flush = bad_flush
748 with self.assertRaises(IOError) as err: # exception not swallowed
749 b.close()
750 self.assertEqual(err.exception.args, ('close',))
751 self.assertFalse(b.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000752
753 def test_multi_close(self):
754 raw = self.MockRawIO()
755 b = self.tp(raw)
756 b.close()
757 b.close()
758 b.close()
759 self.assertRaises(ValueError, b.flush)
760
Antoine Pitroufc9ead62010-12-21 21:26:55 +0000761 def test_readonly_attributes(self):
762 raw = self.MockRawIO()
763 buf = self.tp(raw)
764 x = self.MockRawIO()
765 with self.assertRaises((AttributeError, TypeError)):
766 buf.raw = x
767
Christian Heimes1a6387e2008-03-26 12:49:49 +0000768
Antoine Pitroubff5df02012-07-29 19:02:46 +0200769class SizeofTest:
770
771 @support.cpython_only
772 def test_sizeof(self):
773 bufsize1 = 4096
774 bufsize2 = 8192
775 rawio = self.MockRawIO()
776 bufio = self.tp(rawio, buffer_size=bufsize1)
777 size = sys.getsizeof(bufio) - bufsize1
778 rawio = self.MockRawIO()
779 bufio = self.tp(rawio, buffer_size=bufsize2)
780 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
781
782
Antoine Pitrou19690592009-06-12 20:14:08 +0000783class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
784 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000785
Antoine Pitrou19690592009-06-12 20:14:08 +0000786 def test_constructor(self):
787 rawio = self.MockRawIO([b"abc"])
788 bufio = self.tp(rawio)
789 bufio.__init__(rawio)
790 bufio.__init__(rawio, buffer_size=1024)
791 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000792 self.assertEqual(b"abc", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000793 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
794 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
795 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
796 rawio = self.MockRawIO([b"abc"])
797 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000798 self.assertEqual(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000799
Antoine Pitrou19690592009-06-12 20:14:08 +0000800 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000801 for arg in (None, 7):
802 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
803 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000804 self.assertEqual(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000805 # Invalid args
806 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000807
Antoine Pitrou19690592009-06-12 20:14:08 +0000808 def test_read1(self):
809 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
810 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000811 self.assertEqual(b"a", bufio.read(1))
812 self.assertEqual(b"b", bufio.read1(1))
813 self.assertEqual(rawio._reads, 1)
814 self.assertEqual(b"c", bufio.read1(100))
815 self.assertEqual(rawio._reads, 1)
816 self.assertEqual(b"d", bufio.read1(100))
817 self.assertEqual(rawio._reads, 2)
818 self.assertEqual(b"efg", bufio.read1(100))
819 self.assertEqual(rawio._reads, 3)
820 self.assertEqual(b"", bufio.read1(100))
821 self.assertEqual(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000822 # Invalid args
823 self.assertRaises(ValueError, bufio.read1, -1)
824
825 def test_readinto(self):
826 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
827 bufio = self.tp(rawio)
828 b = bytearray(2)
Ezio Melotti2623a372010-11-21 13:34:58 +0000829 self.assertEqual(bufio.readinto(b), 2)
830 self.assertEqual(b, b"ab")
831 self.assertEqual(bufio.readinto(b), 2)
832 self.assertEqual(b, b"cd")
833 self.assertEqual(bufio.readinto(b), 2)
834 self.assertEqual(b, b"ef")
835 self.assertEqual(bufio.readinto(b), 1)
836 self.assertEqual(b, b"gf")
837 self.assertEqual(bufio.readinto(b), 0)
838 self.assertEqual(b, b"gf")
Antoine Pitrou19690592009-06-12 20:14:08 +0000839
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000840 def test_readlines(self):
841 def bufio():
842 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
843 return self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000844 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
845 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
846 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000847
Antoine Pitrou19690592009-06-12 20:14:08 +0000848 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000849 data = b"abcdefghi"
850 dlen = len(data)
851
852 tests = [
853 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
854 [ 100, [ 3, 3, 3], [ dlen ] ],
855 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
856 ]
857
858 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000859 rawio = self.MockFileIO(data)
860 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000861 pos = 0
862 for nbytes in buf_read_sizes:
Ezio Melotti2623a372010-11-21 13:34:58 +0000863 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Christian Heimes1a6387e2008-03-26 12:49:49 +0000864 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000865 # this is mildly implementation-dependent
Ezio Melotti2623a372010-11-21 13:34:58 +0000866 self.assertEqual(rawio.read_history, raw_read_sizes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000867
Antoine Pitrou19690592009-06-12 20:14:08 +0000868 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000869 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000870 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
871 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000872 self.assertEqual(b"abcd", bufio.read(6))
873 self.assertEqual(b"e", bufio.read(1))
874 self.assertEqual(b"fg", bufio.read())
875 self.assertEqual(b"", bufio.peek(1))
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200876 self.assertIsNone(bufio.read())
Ezio Melotti2623a372010-11-21 13:34:58 +0000877 self.assertEqual(b"", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000878
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200879 rawio = self.MockRawIO((b"a", None, None))
880 self.assertEqual(b"a", rawio.readall())
881 self.assertIsNone(rawio.readall())
882
Antoine Pitrou19690592009-06-12 20:14:08 +0000883 def test_read_past_eof(self):
884 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
885 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000886
Ezio Melotti2623a372010-11-21 13:34:58 +0000887 self.assertEqual(b"abcdefg", bufio.read(9000))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000888
Antoine Pitrou19690592009-06-12 20:14:08 +0000889 def test_read_all(self):
890 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
891 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000892
Ezio Melotti2623a372010-11-21 13:34:58 +0000893 self.assertEqual(b"abcdefg", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000894
Victor Stinner6a102812010-04-27 23:55:59 +0000895 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +0000896 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +0000897 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000898 try:
899 # Write out many bytes with exactly the same number of 0's,
900 # 1's... 255's. This will help us check that concurrent reading
901 # doesn't duplicate or forget contents.
902 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000903 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000904 random.shuffle(l)
905 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000906 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000907 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000908 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000909 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000910 errors = []
911 results = []
912 def f():
913 try:
914 # Intra-buffer read then buffer-flushing read
915 for n in cycle([1, 19]):
916 s = bufio.read(n)
917 if not s:
918 break
919 # list.append() is atomic
920 results.append(s)
921 except Exception as e:
922 errors.append(e)
923 raise
924 threads = [threading.Thread(target=f) for x in range(20)]
925 for t in threads:
926 t.start()
927 time.sleep(0.02) # yield
928 for t in threads:
929 t.join()
930 self.assertFalse(errors,
931 "the following exceptions were caught: %r" % errors)
932 s = b''.join(results)
933 for i in range(256):
934 c = bytes(bytearray([i]))
935 self.assertEqual(s.count(c), N)
936 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000937 support.unlink(support.TESTFN)
938
939 def test_misbehaved_io(self):
940 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
941 bufio = self.tp(rawio)
942 self.assertRaises(IOError, bufio.seek, 0)
943 self.assertRaises(IOError, bufio.tell)
944
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000945 def test_no_extraneous_read(self):
946 # Issue #9550; when the raw IO object has satisfied the read request,
947 # we should not issue any additional reads, otherwise it may block
948 # (e.g. socket).
949 bufsize = 16
950 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
951 rawio = self.MockRawIO([b"x" * n])
952 bufio = self.tp(rawio, bufsize)
953 self.assertEqual(bufio.read(n), b"x" * n)
954 # Simple case: one raw read is enough to satisfy the request.
955 self.assertEqual(rawio._extraneous_reads, 0,
956 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
957 # A more complex case where two raw reads are needed to satisfy
958 # the request.
959 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
960 bufio = self.tp(rawio, bufsize)
961 self.assertEqual(bufio.read(n), b"x" * n)
962 self.assertEqual(rawio._extraneous_reads, 0,
963 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
964
965
Antoine Pitroubff5df02012-07-29 19:02:46 +0200966class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +0000967 tp = io.BufferedReader
968
969 def test_constructor(self):
970 BufferedReaderTest.test_constructor(self)
971 # The allocation can succeed on 32-bit builds, e.g. with more
972 # than 2GB RAM and a 64-bit kernel.
973 if sys.maxsize > 0x7FFFFFFF:
974 rawio = self.MockRawIO()
975 bufio = self.tp(rawio)
976 self.assertRaises((OverflowError, MemoryError, ValueError),
977 bufio.__init__, rawio, sys.maxsize)
978
979 def test_initialization(self):
980 rawio = self.MockRawIO([b"abc"])
981 bufio = self.tp(rawio)
982 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
983 self.assertRaises(ValueError, bufio.read)
984 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
985 self.assertRaises(ValueError, bufio.read)
986 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
987 self.assertRaises(ValueError, bufio.read)
988
989 def test_misbehaved_io_read(self):
990 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
991 bufio = self.tp(rawio)
992 # _pyio.BufferedReader seems to implement reading different, so that
993 # checking this is not so easy.
994 self.assertRaises(IOError, bufio.read, 10)
995
996 def test_garbage_collection(self):
997 # C BufferedReader objects are collected.
998 # The Python version has __del__, so it ends into gc.garbage instead
999 rawio = self.FileIO(support.TESTFN, "w+b")
1000 f = self.tp(rawio)
1001 f.f = f
1002 wr = weakref.ref(f)
1003 del f
1004 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001005 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00001006
1007class PyBufferedReaderTest(BufferedReaderTest):
1008 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001009
1010
Antoine Pitrou19690592009-06-12 20:14:08 +00001011class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1012 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001013
Antoine Pitrou19690592009-06-12 20:14:08 +00001014 def test_constructor(self):
1015 rawio = self.MockRawIO()
1016 bufio = self.tp(rawio)
1017 bufio.__init__(rawio)
1018 bufio.__init__(rawio, buffer_size=1024)
1019 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +00001020 self.assertEqual(3, bufio.write(b"abc"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001021 bufio.flush()
1022 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1023 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1024 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1025 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +00001026 self.assertEqual(3, bufio.write(b"ghi"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001027 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001028 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001029
Antoine Pitrou19690592009-06-12 20:14:08 +00001030 def test_detach_flush(self):
1031 raw = self.MockRawIO()
1032 buf = self.tp(raw)
1033 buf.write(b"howdy!")
1034 self.assertFalse(raw._write_stack)
1035 buf.detach()
1036 self.assertEqual(raw._write_stack, [b"howdy!"])
1037
1038 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001039 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +00001040 writer = self.MockRawIO()
1041 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001042 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001043 self.assertFalse(writer._write_stack)
1044
Antoine Pitrou19690592009-06-12 20:14:08 +00001045 def test_write_overflow(self):
1046 writer = self.MockRawIO()
1047 bufio = self.tp(writer, 8)
1048 contents = b"abcdefghijklmnop"
1049 for n in range(0, len(contents), 3):
1050 bufio.write(contents[n:n+3])
1051 flushed = b"".join(writer._write_stack)
1052 # At least (total - 8) bytes were implicitly flushed, perhaps more
1053 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001054 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001055
Antoine Pitrou19690592009-06-12 20:14:08 +00001056 def check_writes(self, intermediate_func):
1057 # Lots of writes, test the flushed output is as expected.
1058 contents = bytes(range(256)) * 1000
1059 n = 0
1060 writer = self.MockRawIO()
1061 bufio = self.tp(writer, 13)
1062 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1063 def gen_sizes():
1064 for size in count(1):
1065 for i in range(15):
1066 yield size
1067 sizes = gen_sizes()
1068 while n < len(contents):
1069 size = min(next(sizes), len(contents) - n)
Ezio Melotti2623a372010-11-21 13:34:58 +00001070 self.assertEqual(bufio.write(contents[n:n+size]), size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001071 intermediate_func(bufio)
1072 n += size
1073 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001074 self.assertEqual(contents,
Antoine Pitrou19690592009-06-12 20:14:08 +00001075 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001076
Antoine Pitrou19690592009-06-12 20:14:08 +00001077 def test_writes(self):
1078 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001079
Antoine Pitrou19690592009-06-12 20:14:08 +00001080 def test_writes_and_flushes(self):
1081 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001082
Antoine Pitrou19690592009-06-12 20:14:08 +00001083 def test_writes_and_seeks(self):
1084 def _seekabs(bufio):
1085 pos = bufio.tell()
1086 bufio.seek(pos + 1, 0)
1087 bufio.seek(pos - 1, 0)
1088 bufio.seek(pos, 0)
1089 self.check_writes(_seekabs)
1090 def _seekrel(bufio):
1091 pos = bufio.seek(0, 1)
1092 bufio.seek(+1, 1)
1093 bufio.seek(-1, 1)
1094 bufio.seek(pos, 0)
1095 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001096
Antoine Pitrou19690592009-06-12 20:14:08 +00001097 def test_writes_and_truncates(self):
1098 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001099
Antoine Pitrou19690592009-06-12 20:14:08 +00001100 def test_write_non_blocking(self):
1101 raw = self.MockNonBlockWriterIO()
1102 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001103
Ezio Melotti2623a372010-11-21 13:34:58 +00001104 self.assertEqual(bufio.write(b"abcd"), 4)
1105 self.assertEqual(bufio.write(b"efghi"), 5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001106 # 1 byte will be written, the rest will be buffered
1107 raw.block_on(b"k")
Ezio Melotti2623a372010-11-21 13:34:58 +00001108 self.assertEqual(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001109
Antoine Pitrou19690592009-06-12 20:14:08 +00001110 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1111 raw.block_on(b"0")
1112 try:
1113 bufio.write(b"opqrwxyz0123456789")
1114 except self.BlockingIOError as e:
1115 written = e.characters_written
1116 else:
1117 self.fail("BlockingIOError should have been raised")
Ezio Melotti2623a372010-11-21 13:34:58 +00001118 self.assertEqual(written, 16)
1119 self.assertEqual(raw.pop_written(),
Antoine Pitrou19690592009-06-12 20:14:08 +00001120 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001121
Ezio Melotti2623a372010-11-21 13:34:58 +00001122 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Antoine Pitrou19690592009-06-12 20:14:08 +00001123 s = raw.pop_written()
1124 # Previously buffered bytes were flushed
1125 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001126
Antoine Pitrou19690592009-06-12 20:14:08 +00001127 def test_write_and_rewind(self):
1128 raw = io.BytesIO()
1129 bufio = self.tp(raw, 4)
1130 self.assertEqual(bufio.write(b"abcdef"), 6)
1131 self.assertEqual(bufio.tell(), 6)
1132 bufio.seek(0, 0)
1133 self.assertEqual(bufio.write(b"XY"), 2)
1134 bufio.seek(6, 0)
1135 self.assertEqual(raw.getvalue(), b"XYcdef")
1136 self.assertEqual(bufio.write(b"123456"), 6)
1137 bufio.flush()
1138 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001139
Antoine Pitrou19690592009-06-12 20:14:08 +00001140 def test_flush(self):
1141 writer = self.MockRawIO()
1142 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001143 bufio.write(b"abc")
1144 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001145 self.assertEqual(b"abc", writer._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001146
Antoine Pitrou78e761e2012-10-16 22:57:11 +02001147 def test_writelines(self):
1148 l = [b'ab', b'cd', b'ef']
1149 writer = self.MockRawIO()
1150 bufio = self.tp(writer, 8)
1151 bufio.writelines(l)
1152 bufio.flush()
1153 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1154
1155 def test_writelines_userlist(self):
1156 l = UserList([b'ab', b'cd', b'ef'])
1157 writer = self.MockRawIO()
1158 bufio = self.tp(writer, 8)
1159 bufio.writelines(l)
1160 bufio.flush()
1161 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1162
1163 def test_writelines_error(self):
1164 writer = self.MockRawIO()
1165 bufio = self.tp(writer, 8)
1166 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1167 self.assertRaises(TypeError, bufio.writelines, None)
1168
Antoine Pitrou19690592009-06-12 20:14:08 +00001169 def test_destructor(self):
1170 writer = self.MockRawIO()
1171 bufio = self.tp(writer, 8)
1172 bufio.write(b"abc")
1173 del bufio
1174 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001175 self.assertEqual(b"abc", writer._write_stack[0])
Antoine Pitrou19690592009-06-12 20:14:08 +00001176
1177 def test_truncate(self):
1178 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001179 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001180 bufio = self.tp(raw, 8)
1181 bufio.write(b"abcdef")
1182 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001183 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001184 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001185 self.assertEqual(f.read(), b"abc")
1186
Victor Stinner6a102812010-04-27 23:55:59 +00001187 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001188 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001189 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001190 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001191 # Write out many bytes from many threads and test they were
1192 # all flushed.
1193 N = 1000
1194 contents = bytes(range(256)) * N
1195 sizes = cycle([1, 19])
1196 n = 0
1197 queue = deque()
1198 while n < len(contents):
1199 size = next(sizes)
1200 queue.append(contents[n:n+size])
1201 n += size
1202 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001203 # We use a real file object because it allows us to
1204 # exercise situations where the GIL is released before
1205 # writing the buffer to the raw streams. This is in addition
1206 # to concurrency issues due to switching threads in the middle
1207 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001208 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001209 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001210 errors = []
1211 def f():
1212 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001213 while True:
1214 try:
1215 s = queue.popleft()
1216 except IndexError:
1217 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001218 bufio.write(s)
1219 except Exception as e:
1220 errors.append(e)
1221 raise
1222 threads = [threading.Thread(target=f) for x in range(20)]
1223 for t in threads:
1224 t.start()
1225 time.sleep(0.02) # yield
1226 for t in threads:
1227 t.join()
1228 self.assertFalse(errors,
1229 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001230 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001231 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001232 s = f.read()
1233 for i in range(256):
Ezio Melotti2623a372010-11-21 13:34:58 +00001234 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001235 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001236 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001237
Antoine Pitrou19690592009-06-12 20:14:08 +00001238 def test_misbehaved_io(self):
1239 rawio = self.MisbehavedRawIO()
1240 bufio = self.tp(rawio, 5)
1241 self.assertRaises(IOError, bufio.seek, 0)
1242 self.assertRaises(IOError, bufio.tell)
1243 self.assertRaises(IOError, bufio.write, b"abcdef")
1244
1245 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001246 with support.check_warnings(("max_buffer_size is deprecated",
1247 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001248 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001249
Benjamin Petersona2d6d712012-12-20 12:24:10 -06001250 def test_write_error_on_close(self):
1251 raw = self.MockRawIO()
1252 def bad_write(b):
1253 raise IOError()
1254 raw.write = bad_write
1255 b = self.tp(raw)
1256 b.write(b'spam')
1257 self.assertRaises(IOError, b.close) # exception not swallowed
1258 self.assertTrue(b.closed)
1259
Antoine Pitrou19690592009-06-12 20:14:08 +00001260
Antoine Pitroubff5df02012-07-29 19:02:46 +02001261class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001262 tp = io.BufferedWriter
1263
1264 def test_constructor(self):
1265 BufferedWriterTest.test_constructor(self)
1266 # The allocation can succeed on 32-bit builds, e.g. with more
1267 # than 2GB RAM and a 64-bit kernel.
1268 if sys.maxsize > 0x7FFFFFFF:
1269 rawio = self.MockRawIO()
1270 bufio = self.tp(rawio)
1271 self.assertRaises((OverflowError, MemoryError, ValueError),
1272 bufio.__init__, rawio, sys.maxsize)
1273
1274 def test_initialization(self):
1275 rawio = self.MockRawIO()
1276 bufio = self.tp(rawio)
1277 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1278 self.assertRaises(ValueError, bufio.write, b"def")
1279 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1280 self.assertRaises(ValueError, bufio.write, b"def")
1281 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1282 self.assertRaises(ValueError, bufio.write, b"def")
1283
1284 def test_garbage_collection(self):
1285 # C BufferedWriter objects are collected, and collecting them flushes
1286 # all data to disk.
1287 # The Python version has __del__, so it ends into gc.garbage instead
1288 rawio = self.FileIO(support.TESTFN, "w+b")
1289 f = self.tp(rawio)
1290 f.write(b"123xxx")
1291 f.x = f
1292 wr = weakref.ref(f)
1293 del f
1294 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001295 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001296 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001297 self.assertEqual(f.read(), b"123xxx")
1298
1299
1300class PyBufferedWriterTest(BufferedWriterTest):
1301 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001302
1303class BufferedRWPairTest(unittest.TestCase):
1304
Antoine Pitrou19690592009-06-12 20:14:08 +00001305 def test_constructor(self):
1306 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001307 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001308
Antoine Pitrou19690592009-06-12 20:14:08 +00001309 def test_detach(self):
1310 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1311 self.assertRaises(self.UnsupportedOperation, pair.detach)
1312
1313 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001314 with support.check_warnings(("max_buffer_size is deprecated",
1315 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001316 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001317
1318 def test_constructor_with_not_readable(self):
1319 class NotReadable(MockRawIO):
1320 def readable(self):
1321 return False
1322
1323 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1324
1325 def test_constructor_with_not_writeable(self):
1326 class NotWriteable(MockRawIO):
1327 def writable(self):
1328 return False
1329
1330 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1331
1332 def test_read(self):
1333 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1334
1335 self.assertEqual(pair.read(3), b"abc")
1336 self.assertEqual(pair.read(1), b"d")
1337 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001338 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1339 self.assertEqual(pair.read(None), b"abc")
1340
1341 def test_readlines(self):
1342 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1343 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1344 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1345 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001346
1347 def test_read1(self):
1348 # .read1() is delegated to the underlying reader object, so this test
1349 # can be shallow.
1350 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1351
1352 self.assertEqual(pair.read1(3), b"abc")
1353
1354 def test_readinto(self):
1355 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1356
1357 data = bytearray(5)
1358 self.assertEqual(pair.readinto(data), 5)
1359 self.assertEqual(data, b"abcde")
1360
1361 def test_write(self):
1362 w = self.MockRawIO()
1363 pair = self.tp(self.MockRawIO(), w)
1364
1365 pair.write(b"abc")
1366 pair.flush()
1367 pair.write(b"def")
1368 pair.flush()
1369 self.assertEqual(w._write_stack, [b"abc", b"def"])
1370
1371 def test_peek(self):
1372 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1373
1374 self.assertTrue(pair.peek(3).startswith(b"abc"))
1375 self.assertEqual(pair.read(3), b"abc")
1376
1377 def test_readable(self):
1378 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1379 self.assertTrue(pair.readable())
1380
1381 def test_writeable(self):
1382 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1383 self.assertTrue(pair.writable())
1384
1385 def test_seekable(self):
1386 # BufferedRWPairs are never seekable, even if their readers and writers
1387 # are.
1388 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1389 self.assertFalse(pair.seekable())
1390
1391 # .flush() is delegated to the underlying writer object and has been
1392 # tested in the test_write method.
1393
1394 def test_close_and_closed(self):
1395 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1396 self.assertFalse(pair.closed)
1397 pair.close()
1398 self.assertTrue(pair.closed)
1399
1400 def test_isatty(self):
1401 class SelectableIsAtty(MockRawIO):
1402 def __init__(self, isatty):
1403 MockRawIO.__init__(self)
1404 self._isatty = isatty
1405
1406 def isatty(self):
1407 return self._isatty
1408
1409 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1410 self.assertFalse(pair.isatty())
1411
1412 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1413 self.assertTrue(pair.isatty())
1414
1415 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1416 self.assertTrue(pair.isatty())
1417
1418 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1419 self.assertTrue(pair.isatty())
1420
1421class CBufferedRWPairTest(BufferedRWPairTest):
1422 tp = io.BufferedRWPair
1423
1424class PyBufferedRWPairTest(BufferedRWPairTest):
1425 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001426
1427
Antoine Pitrou19690592009-06-12 20:14:08 +00001428class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1429 read_mode = "rb+"
1430 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001431
Antoine Pitrou19690592009-06-12 20:14:08 +00001432 def test_constructor(self):
1433 BufferedReaderTest.test_constructor(self)
1434 BufferedWriterTest.test_constructor(self)
1435
1436 def test_read_and_write(self):
1437 raw = self.MockRawIO((b"asdf", b"ghjk"))
1438 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001439
1440 self.assertEqual(b"as", rw.read(2))
1441 rw.write(b"ddd")
1442 rw.write(b"eee")
1443 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001444 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001445 self.assertEqual(b"dddeee", raw._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001446
Antoine Pitrou19690592009-06-12 20:14:08 +00001447 def test_seek_and_tell(self):
1448 raw = self.BytesIO(b"asdfghjkl")
1449 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001450
Ezio Melotti2623a372010-11-21 13:34:58 +00001451 self.assertEqual(b"as", rw.read(2))
1452 self.assertEqual(2, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001453 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001454 self.assertEqual(b"asdf", rw.read(4))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001455
Antoine Pitrou808cec52011-08-20 15:40:58 +02001456 rw.write(b"123f")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001457 rw.seek(0, 0)
Antoine Pitrou808cec52011-08-20 15:40:58 +02001458 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001459 self.assertEqual(9, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001460 rw.seek(-4, 2)
Ezio Melotti2623a372010-11-21 13:34:58 +00001461 self.assertEqual(5, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001462 rw.seek(2, 1)
Ezio Melotti2623a372010-11-21 13:34:58 +00001463 self.assertEqual(7, rw.tell())
1464 self.assertEqual(b"fl", rw.read(11))
Antoine Pitrou808cec52011-08-20 15:40:58 +02001465 rw.flush()
1466 self.assertEqual(b"asdf123fl", raw.getvalue())
1467
Christian Heimes1a6387e2008-03-26 12:49:49 +00001468 self.assertRaises(TypeError, rw.seek, 0.0)
1469
Antoine Pitrou19690592009-06-12 20:14:08 +00001470 def check_flush_and_read(self, read_func):
1471 raw = self.BytesIO(b"abcdefghi")
1472 bufio = self.tp(raw)
1473
Ezio Melotti2623a372010-11-21 13:34:58 +00001474 self.assertEqual(b"ab", read_func(bufio, 2))
Antoine Pitrou19690592009-06-12 20:14:08 +00001475 bufio.write(b"12")
Ezio Melotti2623a372010-11-21 13:34:58 +00001476 self.assertEqual(b"ef", read_func(bufio, 2))
1477 self.assertEqual(6, bufio.tell())
Antoine Pitrou19690592009-06-12 20:14:08 +00001478 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001479 self.assertEqual(6, bufio.tell())
1480 self.assertEqual(b"ghi", read_func(bufio))
Antoine Pitrou19690592009-06-12 20:14:08 +00001481 raw.seek(0, 0)
1482 raw.write(b"XYZ")
1483 # flush() resets the read buffer
1484 bufio.flush()
1485 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001486 self.assertEqual(b"XYZ", read_func(bufio, 3))
Antoine Pitrou19690592009-06-12 20:14:08 +00001487
1488 def test_flush_and_read(self):
1489 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1490
1491 def test_flush_and_readinto(self):
1492 def _readinto(bufio, n=-1):
1493 b = bytearray(n if n >= 0 else 9999)
1494 n = bufio.readinto(b)
1495 return bytes(b[:n])
1496 self.check_flush_and_read(_readinto)
1497
1498 def test_flush_and_peek(self):
1499 def _peek(bufio, n=-1):
1500 # This relies on the fact that the buffer can contain the whole
1501 # raw stream, otherwise peek() can return less.
1502 b = bufio.peek(n)
1503 if n != -1:
1504 b = b[:n]
1505 bufio.seek(len(b), 1)
1506 return b
1507 self.check_flush_and_read(_peek)
1508
1509 def test_flush_and_write(self):
1510 raw = self.BytesIO(b"abcdefghi")
1511 bufio = self.tp(raw)
1512
1513 bufio.write(b"123")
1514 bufio.flush()
1515 bufio.write(b"45")
1516 bufio.flush()
1517 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001518 self.assertEqual(b"12345fghi", raw.getvalue())
1519 self.assertEqual(b"12345fghi", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +00001520
1521 def test_threads(self):
1522 BufferedReaderTest.test_threads(self)
1523 BufferedWriterTest.test_threads(self)
1524
1525 def test_writes_and_peek(self):
1526 def _peek(bufio):
1527 bufio.peek(1)
1528 self.check_writes(_peek)
1529 def _peek(bufio):
1530 pos = bufio.tell()
1531 bufio.seek(-1, 1)
1532 bufio.peek(1)
1533 bufio.seek(pos, 0)
1534 self.check_writes(_peek)
1535
1536 def test_writes_and_reads(self):
1537 def _read(bufio):
1538 bufio.seek(-1, 1)
1539 bufio.read(1)
1540 self.check_writes(_read)
1541
1542 def test_writes_and_read1s(self):
1543 def _read1(bufio):
1544 bufio.seek(-1, 1)
1545 bufio.read1(1)
1546 self.check_writes(_read1)
1547
1548 def test_writes_and_readintos(self):
1549 def _read(bufio):
1550 bufio.seek(-1, 1)
1551 bufio.readinto(bytearray(1))
1552 self.check_writes(_read)
1553
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001554 def test_write_after_readahead(self):
1555 # Issue #6629: writing after the buffer was filled by readahead should
1556 # first rewind the raw stream.
1557 for overwrite_size in [1, 5]:
1558 raw = self.BytesIO(b"A" * 10)
1559 bufio = self.tp(raw, 4)
1560 # Trigger readahead
1561 self.assertEqual(bufio.read(1), b"A")
1562 self.assertEqual(bufio.tell(), 1)
1563 # Overwriting should rewind the raw stream if it needs so
1564 bufio.write(b"B" * overwrite_size)
1565 self.assertEqual(bufio.tell(), overwrite_size + 1)
1566 # If the write size was smaller than the buffer size, flush() and
1567 # check that rewind happens.
1568 bufio.flush()
1569 self.assertEqual(bufio.tell(), overwrite_size + 1)
1570 s = raw.getvalue()
1571 self.assertEqual(s,
1572 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1573
Antoine Pitrouee46a7b2011-05-13 00:31:52 +02001574 def test_write_rewind_write(self):
1575 # Various combinations of reading / writing / seeking backwards / writing again
1576 def mutate(bufio, pos1, pos2):
1577 assert pos2 >= pos1
1578 # Fill the buffer
1579 bufio.seek(pos1)
1580 bufio.read(pos2 - pos1)
1581 bufio.write(b'\x02')
1582 # This writes earlier than the previous write, but still inside
1583 # the buffer.
1584 bufio.seek(pos1)
1585 bufio.write(b'\x01')
1586
1587 b = b"\x80\x81\x82\x83\x84"
1588 for i in range(0, len(b)):
1589 for j in range(i, len(b)):
1590 raw = self.BytesIO(b)
1591 bufio = self.tp(raw, 100)
1592 mutate(bufio, i, j)
1593 bufio.flush()
1594 expected = bytearray(b)
1595 expected[j] = 2
1596 expected[i] = 1
1597 self.assertEqual(raw.getvalue(), expected,
1598 "failed result for i=%d, j=%d" % (i, j))
1599
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001600 def test_truncate_after_read_or_write(self):
1601 raw = self.BytesIO(b"A" * 10)
1602 bufio = self.tp(raw, 100)
1603 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1604 self.assertEqual(bufio.truncate(), 2)
1605 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1606 self.assertEqual(bufio.truncate(), 4)
1607
Antoine Pitrou19690592009-06-12 20:14:08 +00001608 def test_misbehaved_io(self):
1609 BufferedReaderTest.test_misbehaved_io(self)
1610 BufferedWriterTest.test_misbehaved_io(self)
1611
Antoine Pitrou808cec52011-08-20 15:40:58 +02001612 def test_interleaved_read_write(self):
1613 # Test for issue #12213
1614 with self.BytesIO(b'abcdefgh') as raw:
1615 with self.tp(raw, 100) as f:
1616 f.write(b"1")
1617 self.assertEqual(f.read(1), b'b')
1618 f.write(b'2')
1619 self.assertEqual(f.read1(1), b'd')
1620 f.write(b'3')
1621 buf = bytearray(1)
1622 f.readinto(buf)
1623 self.assertEqual(buf, b'f')
1624 f.write(b'4')
1625 self.assertEqual(f.peek(1), b'h')
1626 f.flush()
1627 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1628
1629 with self.BytesIO(b'abc') as raw:
1630 with self.tp(raw, 100) as f:
1631 self.assertEqual(f.read(1), b'a')
1632 f.write(b"2")
1633 self.assertEqual(f.read(1), b'c')
1634 f.flush()
1635 self.assertEqual(raw.getvalue(), b'a2c')
1636
1637 def test_interleaved_readline_write(self):
1638 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1639 with self.tp(raw) as f:
1640 f.write(b'1')
1641 self.assertEqual(f.readline(), b'b\n')
1642 f.write(b'2')
1643 self.assertEqual(f.readline(), b'def\n')
1644 f.write(b'3')
1645 self.assertEqual(f.readline(), b'\n')
1646 f.flush()
1647 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1648
Antoine Pitroubff5df02012-07-29 19:02:46 +02001649class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
1650 BufferedRandomTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001651 tp = io.BufferedRandom
1652
1653 def test_constructor(self):
1654 BufferedRandomTest.test_constructor(self)
1655 # The allocation can succeed on 32-bit builds, e.g. with more
1656 # than 2GB RAM and a 64-bit kernel.
1657 if sys.maxsize > 0x7FFFFFFF:
1658 rawio = self.MockRawIO()
1659 bufio = self.tp(rawio)
1660 self.assertRaises((OverflowError, MemoryError, ValueError),
1661 bufio.__init__, rawio, sys.maxsize)
1662
1663 def test_garbage_collection(self):
1664 CBufferedReaderTest.test_garbage_collection(self)
1665 CBufferedWriterTest.test_garbage_collection(self)
1666
1667class PyBufferedRandomTest(BufferedRandomTest):
1668 tp = pyio.BufferedRandom
1669
1670
Christian Heimes1a6387e2008-03-26 12:49:49 +00001671# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1672# properties:
1673# - A single output character can correspond to many bytes of input.
1674# - The number of input bytes to complete the character can be
1675# undetermined until the last input byte is received.
1676# - The number of input bytes can vary depending on previous input.
1677# - A single input byte can correspond to many characters of output.
1678# - The number of output characters can be undetermined until the
1679# last input byte is received.
1680# - The number of output characters can vary depending on previous input.
1681
1682class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1683 """
1684 For testing seek/tell behavior with a stateful, buffering decoder.
1685
1686 Input is a sequence of words. Words may be fixed-length (length set
1687 by input) or variable-length (period-terminated). In variable-length
1688 mode, extra periods are ignored. Possible words are:
1689 - 'i' followed by a number sets the input length, I (maximum 99).
1690 When I is set to 0, words are space-terminated.
1691 - 'o' followed by a number sets the output length, O (maximum 99).
1692 - Any other word is converted into a word followed by a period on
1693 the output. The output word consists of the input word truncated
1694 or padded out with hyphens to make its length equal to O. If O
1695 is 0, the word is output verbatim without truncating or padding.
1696 I and O are initially set to 1. When I changes, any buffered input is
1697 re-scanned according to the new I. EOF also terminates the last word.
1698 """
1699
1700 def __init__(self, errors='strict'):
1701 codecs.IncrementalDecoder.__init__(self, errors)
1702 self.reset()
1703
1704 def __repr__(self):
1705 return '<SID %x>' % id(self)
1706
1707 def reset(self):
1708 self.i = 1
1709 self.o = 1
1710 self.buffer = bytearray()
1711
1712 def getstate(self):
1713 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1714 return bytes(self.buffer), i*100 + o
1715
1716 def setstate(self, state):
1717 buffer, io = state
1718 self.buffer = bytearray(buffer)
1719 i, o = divmod(io, 100)
1720 self.i, self.o = i ^ 1, o ^ 1
1721
1722 def decode(self, input, final=False):
1723 output = ''
1724 for b in input:
1725 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001726 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001727 if self.buffer:
1728 output += self.process_word()
1729 else:
1730 self.buffer.append(b)
1731 else: # fixed-length, terminate after self.i bytes
1732 self.buffer.append(b)
1733 if len(self.buffer) == self.i:
1734 output += self.process_word()
1735 if final and self.buffer: # EOF terminates the last word
1736 output += self.process_word()
1737 return output
1738
1739 def process_word(self):
1740 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001741 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001742 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001743 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001744 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1745 else:
1746 output = self.buffer.decode('ascii')
1747 if len(output) < self.o:
1748 output += '-'*self.o # pad out with hyphens
1749 if self.o:
1750 output = output[:self.o] # truncate to output length
1751 output += '.'
1752 self.buffer = bytearray()
1753 return output
1754
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001755 codecEnabled = False
1756
1757 @classmethod
1758 def lookupTestDecoder(cls, name):
1759 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001760 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001761 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001762 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001763 incrementalencoder=None,
1764 streamreader=None, streamwriter=None,
1765 incrementaldecoder=cls)
1766
1767# Register the previous decoder for testing.
1768# Disabled by default, tests will enable it.
1769codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1770
1771
Christian Heimes1a6387e2008-03-26 12:49:49 +00001772class StatefulIncrementalDecoderTest(unittest.TestCase):
1773 """
1774 Make sure the StatefulIncrementalDecoder actually works.
1775 """
1776
1777 test_cases = [
1778 # I=1, O=1 (fixed-length input == fixed-length output)
1779 (b'abcd', False, 'a.b.c.d.'),
1780 # I=0, O=0 (variable-length input, variable-length output)
1781 (b'oiabcd', True, 'abcd.'),
1782 # I=0, O=0 (should ignore extra periods)
1783 (b'oi...abcd...', True, 'abcd.'),
1784 # I=0, O=6 (variable-length input, fixed-length output)
1785 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1786 # I=2, O=6 (fixed-length input < fixed-length output)
1787 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1788 # I=6, O=3 (fixed-length input > fixed-length output)
1789 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1790 # I=0, then 3; O=29, then 15 (with longer output)
1791 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1792 'a----------------------------.' +
1793 'b----------------------------.' +
1794 'cde--------------------------.' +
1795 'abcdefghijabcde.' +
1796 'a.b------------.' +
1797 '.c.------------.' +
1798 'd.e------------.' +
1799 'k--------------.' +
1800 'l--------------.' +
1801 'm--------------.')
1802 ]
1803
Antoine Pitrou19690592009-06-12 20:14:08 +00001804 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001805 # Try a few one-shot test cases.
1806 for input, eof, output in self.test_cases:
1807 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001808 self.assertEqual(d.decode(input, eof), output)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001809
1810 # Also test an unfinished decode, followed by forcing EOF.
1811 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001812 self.assertEqual(d.decode(b'oiabcd'), '')
1813 self.assertEqual(d.decode(b'', 1), 'abcd.')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001814
1815class TextIOWrapperTest(unittest.TestCase):
1816
1817 def setUp(self):
1818 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1819 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001820 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001821
1822 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001823 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001824
Antoine Pitrou19690592009-06-12 20:14:08 +00001825 def test_constructor(self):
1826 r = self.BytesIO(b"\xc3\xa9\n\n")
1827 b = self.BufferedReader(r, 1000)
1828 t = self.TextIOWrapper(b)
1829 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti2623a372010-11-21 13:34:58 +00001830 self.assertEqual(t.encoding, "latin1")
1831 self.assertEqual(t.line_buffering, False)
Antoine Pitrou19690592009-06-12 20:14:08 +00001832 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti2623a372010-11-21 13:34:58 +00001833 self.assertEqual(t.encoding, "utf8")
1834 self.assertEqual(t.line_buffering, True)
1835 self.assertEqual("\xe9\n", t.readline())
Antoine Pitrou19690592009-06-12 20:14:08 +00001836 self.assertRaises(TypeError, t.__init__, b, newline=42)
1837 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1838
1839 def test_detach(self):
1840 r = self.BytesIO()
1841 b = self.BufferedWriter(r)
1842 t = self.TextIOWrapper(b)
1843 self.assertIs(t.detach(), b)
1844
1845 t = self.TextIOWrapper(b, encoding="ascii")
1846 t.write("howdy")
1847 self.assertFalse(r.getvalue())
1848 t.detach()
1849 self.assertEqual(r.getvalue(), b"howdy")
1850 self.assertRaises(ValueError, t.detach)
1851
1852 def test_repr(self):
1853 raw = self.BytesIO("hello".encode("utf-8"))
1854 b = self.BufferedReader(raw)
1855 t = self.TextIOWrapper(b, encoding="utf-8")
1856 modname = self.TextIOWrapper.__module__
1857 self.assertEqual(repr(t),
1858 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1859 raw.name = "dummy"
1860 self.assertEqual(repr(t),
1861 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1862 raw.name = b"dummy"
1863 self.assertEqual(repr(t),
1864 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1865
1866 def test_line_buffering(self):
1867 r = self.BytesIO()
1868 b = self.BufferedWriter(r, 1000)
1869 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1870 t.write("X")
Ezio Melotti2623a372010-11-21 13:34:58 +00001871 self.assertEqual(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001872 t.write("Y\nZ")
Ezio Melotti2623a372010-11-21 13:34:58 +00001873 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001874 t.write("A\rB")
Ezio Melotti2623a372010-11-21 13:34:58 +00001875 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001876
Antoine Pitrou19690592009-06-12 20:14:08 +00001877 def test_encoding(self):
1878 # Check the encoding attribute is always set, and valid
1879 b = self.BytesIO()
1880 t = self.TextIOWrapper(b, encoding="utf8")
1881 self.assertEqual(t.encoding, "utf8")
1882 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001883 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00001884 codecs.lookup(t.encoding)
1885
1886 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001887 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001888 b = self.BytesIO(b"abc\n\xff\n")
1889 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001890 self.assertRaises(UnicodeError, t.read)
1891 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001892 b = self.BytesIO(b"abc\n\xff\n")
1893 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001894 self.assertRaises(UnicodeError, t.read)
1895 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001896 b = self.BytesIO(b"abc\n\xff\n")
1897 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti2623a372010-11-21 13:34:58 +00001898 self.assertEqual(t.read(), "abc\n\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001899 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001900 b = self.BytesIO(b"abc\n\xff\n")
1901 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti2623a372010-11-21 13:34:58 +00001902 self.assertEqual(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001903
Antoine Pitrou19690592009-06-12 20:14:08 +00001904 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001905 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001906 b = self.BytesIO()
1907 t = self.TextIOWrapper(b, encoding="ascii")
1908 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001909 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001910 b = self.BytesIO()
1911 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1912 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001913 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001914 b = self.BytesIO()
1915 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001916 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001917 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001918 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001919 self.assertEqual(b.getvalue(), b"abcdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001920 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001921 b = self.BytesIO()
1922 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001923 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001924 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001925 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001926 self.assertEqual(b.getvalue(), b"abc?def\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001927
Antoine Pitrou19690592009-06-12 20:14:08 +00001928 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001929 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1930
1931 tests = [
1932 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1933 [ '', input_lines ],
1934 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1935 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1936 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1937 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001938 encodings = (
1939 'utf-8', 'latin-1',
1940 'utf-16', 'utf-16-le', 'utf-16-be',
1941 'utf-32', 'utf-32-le', 'utf-32-be',
1942 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00001943
1944 # Try a range of buffer sizes to test the case where \r is the last
1945 # character in TextIOWrapper._pending_line.
1946 for encoding in encodings:
1947 # XXX: str.encode() should return bytes
1948 data = bytes(''.join(input_lines).encode(encoding))
1949 for do_reads in (False, True):
1950 for bufsize in range(1, 10):
1951 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001952 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1953 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00001954 encoding=encoding)
1955 if do_reads:
1956 got_lines = []
1957 while True:
1958 c2 = textio.read(2)
1959 if c2 == '':
1960 break
Ezio Melotti2623a372010-11-21 13:34:58 +00001961 self.assertEqual(len(c2), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001962 got_lines.append(c2 + textio.readline())
1963 else:
1964 got_lines = list(textio)
1965
1966 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti2623a372010-11-21 13:34:58 +00001967 self.assertEqual(got_line, exp_line)
1968 self.assertEqual(len(got_lines), len(exp_lines))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001969
Antoine Pitrou19690592009-06-12 20:14:08 +00001970 def test_newlines_input(self):
1971 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001972 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1973 for newline, expected in [
1974 (None, normalized.decode("ascii").splitlines(True)),
1975 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00001976 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1977 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1978 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00001979 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00001980 buf = self.BytesIO(testdata)
1981 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti2623a372010-11-21 13:34:58 +00001982 self.assertEqual(txt.readlines(), expected)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001983 txt.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001984 self.assertEqual(txt.read(), "".join(expected))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001985
Antoine Pitrou19690592009-06-12 20:14:08 +00001986 def test_newlines_output(self):
1987 testdict = {
1988 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1989 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1990 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1991 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1992 }
1993 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1994 for newline, expected in tests:
1995 buf = self.BytesIO()
1996 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1997 txt.write("AAA\nB")
1998 txt.write("BB\nCCC\n")
1999 txt.write("X\rY\r\nZ")
2000 txt.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002001 self.assertEqual(buf.closed, False)
2002 self.assertEqual(buf.getvalue(), expected)
Antoine Pitrou19690592009-06-12 20:14:08 +00002003
2004 def test_destructor(self):
2005 l = []
2006 base = self.BytesIO
2007 class MyBytesIO(base):
2008 def close(self):
2009 l.append(self.getvalue())
2010 base.close(self)
2011 b = MyBytesIO()
2012 t = self.TextIOWrapper(b, encoding="ascii")
2013 t.write("abc")
2014 del t
2015 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00002016 self.assertEqual([b"abc"], l)
Antoine Pitrou19690592009-06-12 20:14:08 +00002017
2018 def test_override_destructor(self):
2019 record = []
2020 class MyTextIO(self.TextIOWrapper):
2021 def __del__(self):
2022 record.append(1)
2023 try:
2024 f = super(MyTextIO, self).__del__
2025 except AttributeError:
2026 pass
2027 else:
2028 f()
2029 def close(self):
2030 record.append(2)
2031 super(MyTextIO, self).close()
2032 def flush(self):
2033 record.append(3)
2034 super(MyTextIO, self).flush()
2035 b = self.BytesIO()
2036 t = MyTextIO(b, encoding="ascii")
2037 del t
2038 support.gc_collect()
2039 self.assertEqual(record, [1, 2, 3])
2040
2041 def test_error_through_destructor(self):
2042 # Test that the exception state is not modified by a destructor,
2043 # even if close() fails.
2044 rawio = self.CloseFailureIO()
2045 def f():
2046 self.TextIOWrapper(rawio).xyzzy
2047 with support.captured_output("stderr") as s:
2048 self.assertRaises(AttributeError, f)
2049 s = s.getvalue().strip()
2050 if s:
2051 # The destructor *may* have printed an unraisable error, check it
2052 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002053 self.assertTrue(s.startswith("Exception IOError: "), s)
2054 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002055
2056 # Systematic tests of the text I/O API
2057
Antoine Pitrou19690592009-06-12 20:14:08 +00002058 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002059 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2060 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00002061 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002062 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002063 self.assertEqual(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002064 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002065 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002066 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002067 self.assertEqual(f.tell(), 0)
2068 self.assertEqual(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002069 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002070 self.assertEqual(f.seek(0), 0)
2071 self.assertEqual(f.read(None), "abc")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002072 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002073 self.assertEqual(f.read(2), "ab")
2074 self.assertEqual(f.read(1), "c")
2075 self.assertEqual(f.read(1), "")
2076 self.assertEqual(f.read(), "")
2077 self.assertEqual(f.tell(), cookie)
2078 self.assertEqual(f.seek(0), 0)
2079 self.assertEqual(f.seek(0, 2), cookie)
2080 self.assertEqual(f.write("def"), 3)
2081 self.assertEqual(f.seek(cookie), cookie)
2082 self.assertEqual(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002083 if enc.startswith("utf"):
2084 self.multi_line_test(f, enc)
2085 f.close()
2086
2087 def multi_line_test(self, f, enc):
2088 f.seek(0)
2089 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00002090 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002091 wlines = []
2092 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2093 chars = []
2094 for i in range(size):
2095 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00002096 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002097 wlines.append((f.tell(), line))
2098 f.write(line)
2099 f.seek(0)
2100 rlines = []
2101 while True:
2102 pos = f.tell()
2103 line = f.readline()
2104 if not line:
2105 break
2106 rlines.append((pos, line))
Ezio Melotti2623a372010-11-21 13:34:58 +00002107 self.assertEqual(rlines, wlines)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002108
Antoine Pitrou19690592009-06-12 20:14:08 +00002109 def test_telling(self):
2110 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002111 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002112 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002113 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002114 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002115 p2 = f.tell()
2116 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002117 self.assertEqual(f.tell(), p0)
2118 self.assertEqual(f.readline(), "\xff\n")
2119 self.assertEqual(f.tell(), p1)
2120 self.assertEqual(f.readline(), "\xff\n")
2121 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002122 f.seek(0)
2123 for line in f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002124 self.assertEqual(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002125 self.assertRaises(IOError, f.tell)
Ezio Melotti2623a372010-11-21 13:34:58 +00002126 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002127 f.close()
2128
Antoine Pitrou19690592009-06-12 20:14:08 +00002129 def test_seeking(self):
2130 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00002131 prefix_size = chunk_size - 2
2132 u_prefix = "a" * prefix_size
2133 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti2623a372010-11-21 13:34:58 +00002134 self.assertEqual(len(u_prefix), len(prefix))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002135 u_suffix = "\u8888\n"
2136 suffix = bytes(u_suffix.encode("utf-8"))
2137 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00002138 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002139 f.write(line*2)
2140 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002141 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002142 s = f.read(prefix_size)
Ezio Melotti2623a372010-11-21 13:34:58 +00002143 self.assertEqual(s, prefix.decode("ascii"))
2144 self.assertEqual(f.tell(), prefix_size)
2145 self.assertEqual(f.readline(), u_suffix)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002146
Antoine Pitrou19690592009-06-12 20:14:08 +00002147 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002148 # Regression test for a specific bug
2149 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00002150 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002151 f.write(data)
2152 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002153 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002154 f._CHUNK_SIZE # Just test that it exists
2155 f._CHUNK_SIZE = 2
2156 f.readline()
2157 f.tell()
2158
Antoine Pitrou19690592009-06-12 20:14:08 +00002159 def test_seek_and_tell(self):
2160 #Test seek/tell using the StatefulIncrementalDecoder.
2161 # Make test faster by doing smaller seeks
2162 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00002163
Antoine Pitrou19690592009-06-12 20:14:08 +00002164 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002165 """Tell/seek to various points within a data stream and ensure
2166 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00002167 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002168 f.write(data)
2169 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002170 f = self.open(support.TESTFN, encoding='test_decoder')
2171 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00002172 decoded = f.read()
2173 f.close()
2174
2175 for i in range(min_pos, len(decoded) + 1): # seek positions
2176 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00002177 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti2623a372010-11-21 13:34:58 +00002178 self.assertEqual(f.read(i), decoded[:i])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002179 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002180 self.assertEqual(f.read(j), decoded[i:i + j])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002181 f.seek(cookie)
Ezio Melotti2623a372010-11-21 13:34:58 +00002182 self.assertEqual(f.read(), decoded[i:])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002183 f.close()
2184
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002185 # Enable the test decoder.
2186 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002187
2188 # Run the tests.
2189 try:
2190 # Try each test case.
2191 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002192 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002193
2194 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002195 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2196 offset = CHUNK_SIZE - len(input)//2
2197 prefix = b'.'*offset
2198 # Don't bother seeking into the prefix (takes too long).
2199 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002200 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002201
2202 # Ensure our test decoder won't interfere with subsequent tests.
2203 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002204 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002205
Antoine Pitrou19690592009-06-12 20:14:08 +00002206 def test_encoded_writes(self):
2207 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002208 tests = ("utf-16",
2209 "utf-16-le",
2210 "utf-16-be",
2211 "utf-32",
2212 "utf-32-le",
2213 "utf-32-be")
2214 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002215 buf = self.BytesIO()
2216 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002217 # Check if the BOM is written only once (see issue1753).
2218 f.write(data)
2219 f.write(data)
2220 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002221 self.assertEqual(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002222 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002223 self.assertEqual(f.read(), data * 2)
2224 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002225
Antoine Pitrou19690592009-06-12 20:14:08 +00002226 def test_unreadable(self):
2227 class UnReadable(self.BytesIO):
2228 def readable(self):
2229 return False
2230 txt = self.TextIOWrapper(UnReadable())
2231 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002232
Antoine Pitrou19690592009-06-12 20:14:08 +00002233 def test_read_one_by_one(self):
2234 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002235 reads = ""
2236 while True:
2237 c = txt.read(1)
2238 if not c:
2239 break
2240 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002241 self.assertEqual(reads, "AA\nBB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002242
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002243 def test_readlines(self):
2244 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2245 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2246 txt.seek(0)
2247 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2248 txt.seek(0)
2249 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2250
Christian Heimes1a6387e2008-03-26 12:49:49 +00002251 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002252 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002253 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002254 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002255 reads = ""
2256 while True:
2257 c = txt.read(128)
2258 if not c:
2259 break
2260 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002261 self.assertEqual(reads, "A"*127+"\nB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002262
Antoine Pitroueadca1d2012-10-16 23:02:27 +02002263 def test_writelines(self):
2264 l = ['ab', 'cd', 'ef']
2265 buf = self.BytesIO()
2266 txt = self.TextIOWrapper(buf)
2267 txt.writelines(l)
2268 txt.flush()
2269 self.assertEqual(buf.getvalue(), b'abcdef')
2270
2271 def test_writelines_userlist(self):
2272 l = UserList(['ab', 'cd', 'ef'])
2273 buf = self.BytesIO()
2274 txt = self.TextIOWrapper(buf)
2275 txt.writelines(l)
2276 txt.flush()
2277 self.assertEqual(buf.getvalue(), b'abcdef')
2278
2279 def test_writelines_error(self):
2280 txt = self.TextIOWrapper(self.BytesIO())
2281 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2282 self.assertRaises(TypeError, txt.writelines, None)
2283 self.assertRaises(TypeError, txt.writelines, b'abc')
2284
Christian Heimes1a6387e2008-03-26 12:49:49 +00002285 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002286 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002287
2288 # read one char at a time
2289 reads = ""
2290 while True:
2291 c = txt.read(1)
2292 if not c:
2293 break
2294 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002295 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002296
2297 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002298 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002299 txt._CHUNK_SIZE = 4
2300
2301 reads = ""
2302 while True:
2303 c = txt.read(4)
2304 if not c:
2305 break
2306 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002307 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002308
2309 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002310 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002311 txt._CHUNK_SIZE = 4
2312
2313 reads = txt.read(4)
2314 reads += txt.read(4)
2315 reads += txt.readline()
2316 reads += txt.readline()
2317 reads += txt.readline()
Ezio Melotti2623a372010-11-21 13:34:58 +00002318 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002319
2320 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002321 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002322 txt._CHUNK_SIZE = 4
2323
2324 reads = txt.read(4)
2325 reads += txt.read()
Ezio Melotti2623a372010-11-21 13:34:58 +00002326 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002327
2328 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002329 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002330 txt._CHUNK_SIZE = 4
2331
2332 reads = txt.read(4)
2333 pos = txt.tell()
2334 txt.seek(0)
2335 txt.seek(pos)
Ezio Melotti2623a372010-11-21 13:34:58 +00002336 self.assertEqual(txt.read(4), "BBB\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002337
2338 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002339 buffer = self.BytesIO(self.testdata)
2340 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002341
2342 self.assertEqual(buffer.seekable(), txt.seekable())
2343
Antoine Pitrou19690592009-06-12 20:14:08 +00002344 def test_append_bom(self):
2345 # The BOM is not written again when appending to a non-empty file
2346 filename = support.TESTFN
2347 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2348 with self.open(filename, 'w', encoding=charset) as f:
2349 f.write('aaa')
2350 pos = f.tell()
2351 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002352 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002353
2354 with self.open(filename, 'a', encoding=charset) as f:
2355 f.write('xxx')
2356 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002357 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002358
Antoine Pitrou19690592009-06-12 20:14:08 +00002359 def test_seek_bom(self):
2360 # Same test, but when seeking manually
2361 filename = support.TESTFN
2362 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2363 with self.open(filename, 'w', encoding=charset) as f:
2364 f.write('aaa')
2365 pos = f.tell()
2366 with self.open(filename, 'r+', encoding=charset) as f:
2367 f.seek(pos)
2368 f.write('zzz')
2369 f.seek(0)
2370 f.write('bbb')
2371 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002372 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002373
2374 def test_errors_property(self):
2375 with self.open(support.TESTFN, "w") as f:
2376 self.assertEqual(f.errors, "strict")
2377 with self.open(support.TESTFN, "w", errors="replace") as f:
2378 self.assertEqual(f.errors, "replace")
2379
Victor Stinner6a102812010-04-27 23:55:59 +00002380 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002381 def test_threads_write(self):
2382 # Issue6750: concurrent writes could duplicate data
2383 event = threading.Event()
2384 with self.open(support.TESTFN, "w", buffering=1) as f:
2385 def run(n):
2386 text = "Thread%03d\n" % n
2387 event.wait()
2388 f.write(text)
2389 threads = [threading.Thread(target=lambda n=x: run(n))
2390 for x in range(20)]
2391 for t in threads:
2392 t.start()
2393 time.sleep(0.02)
2394 event.set()
2395 for t in threads:
2396 t.join()
2397 with self.open(support.TESTFN) as f:
2398 content = f.read()
2399 for n in range(20):
Ezio Melotti2623a372010-11-21 13:34:58 +00002400 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002401
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002402 def test_flush_error_on_close(self):
2403 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2404 def bad_flush():
2405 raise IOError()
2406 txt.flush = bad_flush
2407 self.assertRaises(IOError, txt.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -06002408 self.assertTrue(txt.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002409
2410 def test_multi_close(self):
2411 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2412 txt.close()
2413 txt.close()
2414 txt.close()
2415 self.assertRaises(ValueError, txt.flush)
2416
Antoine Pitroufc9ead62010-12-21 21:26:55 +00002417 def test_readonly_attributes(self):
2418 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2419 buf = self.BytesIO(self.testdata)
2420 with self.assertRaises((AttributeError, TypeError)):
2421 txt.buffer = buf
2422
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002423 def test_read_nonbytes(self):
2424 # Issue #17106
2425 # Crash when underlying read() returns non-bytes
2426 class NonbytesStream(self.StringIO):
2427 read1 = self.StringIO.read
2428 class NonbytesStream(self.StringIO):
2429 read1 = self.StringIO.read
2430 t = self.TextIOWrapper(NonbytesStream('a'))
2431 with self.maybeRaises(TypeError):
2432 t.read(1)
2433 t = self.TextIOWrapper(NonbytesStream('a'))
2434 with self.maybeRaises(TypeError):
2435 t.readline()
2436 t = self.TextIOWrapper(NonbytesStream('a'))
2437 self.assertEqual(t.read(), u'a')
2438
2439 def test_illegal_decoder(self):
2440 # Issue #17106
2441 # Crash when decoder returns non-string
2442 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2443 encoding='quopri_codec')
2444 with self.maybeRaises(TypeError):
2445 t.read(1)
2446 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2447 encoding='quopri_codec')
2448 with self.maybeRaises(TypeError):
2449 t.readline()
2450 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2451 encoding='quopri_codec')
2452 with self.maybeRaises(TypeError):
2453 t.read()
2454
2455
Antoine Pitrou19690592009-06-12 20:14:08 +00002456class CTextIOWrapperTest(TextIOWrapperTest):
2457
2458 def test_initialization(self):
2459 r = self.BytesIO(b"\xc3\xa9\n\n")
2460 b = self.BufferedReader(r, 1000)
2461 t = self.TextIOWrapper(b)
2462 self.assertRaises(TypeError, t.__init__, b, newline=42)
2463 self.assertRaises(ValueError, t.read)
2464 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2465 self.assertRaises(ValueError, t.read)
2466
2467 def test_garbage_collection(self):
2468 # C TextIOWrapper objects are collected, and collecting them flushes
2469 # all data to disk.
2470 # The Python version has __del__, so it ends in gc.garbage instead.
2471 rawio = io.FileIO(support.TESTFN, "wb")
2472 b = self.BufferedWriter(rawio)
2473 t = self.TextIOWrapper(b, encoding="ascii")
2474 t.write("456def")
2475 t.x = t
2476 wr = weakref.ref(t)
2477 del t
2478 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002479 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002480 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002481 self.assertEqual(f.read(), b"456def")
2482
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002483 def test_rwpair_cleared_before_textio(self):
2484 # Issue 13070: TextIOWrapper's finalization would crash when called
2485 # after the reference to the underlying BufferedRWPair's writer got
2486 # cleared by the GC.
2487 for i in range(1000):
2488 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2489 t1 = self.TextIOWrapper(b1, encoding="ascii")
2490 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2491 t2 = self.TextIOWrapper(b2, encoding="ascii")
2492 # circular references
2493 t1.buddy = t2
2494 t2.buddy = t1
2495 support.gc_collect()
2496
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002497 maybeRaises = unittest.TestCase.assertRaises
2498
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002499
Antoine Pitrou19690592009-06-12 20:14:08 +00002500class PyTextIOWrapperTest(TextIOWrapperTest):
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002501 @contextlib.contextmanager
2502 def maybeRaises(self, *args, **kwds):
2503 yield
Antoine Pitrou19690592009-06-12 20:14:08 +00002504
2505
2506class IncrementalNewlineDecoderTest(unittest.TestCase):
2507
2508 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002509 # UTF-8 specific tests for a newline decoder
2510 def _check_decode(b, s, **kwargs):
2511 # We exercise getstate() / setstate() as well as decode()
2512 state = decoder.getstate()
Ezio Melotti2623a372010-11-21 13:34:58 +00002513 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002514 decoder.setstate(state)
Ezio Melotti2623a372010-11-21 13:34:58 +00002515 self.assertEqual(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002516
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002517 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002518
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002519 _check_decode(b'\xe8', "")
2520 _check_decode(b'\xa2', "")
2521 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002522
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002523 _check_decode(b'\xe8', "")
2524 _check_decode(b'\xa2', "")
2525 _check_decode(b'\x88', "\u8888")
2526
2527 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002528 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2529
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002530 decoder.reset()
2531 _check_decode(b'\n', "\n")
2532 _check_decode(b'\r', "")
2533 _check_decode(b'', "\n", final=True)
2534 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002535
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002536 _check_decode(b'\r', "")
2537 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002538
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002539 _check_decode(b'\r\r\n', "\n\n")
2540 _check_decode(b'\r', "")
2541 _check_decode(b'\r', "\n")
2542 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002543
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002544 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2545 _check_decode(b'\xe8\xa2\x88', "\u8888")
2546 _check_decode(b'\n', "\n")
2547 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2548 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002549
Antoine Pitrou19690592009-06-12 20:14:08 +00002550 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002551 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002552 if encoding is not None:
2553 encoder = codecs.getincrementalencoder(encoding)()
2554 def _decode_bytewise(s):
2555 # Decode one byte at a time
2556 for b in encoder.encode(s):
2557 result.append(decoder.decode(b))
2558 else:
2559 encoder = None
2560 def _decode_bytewise(s):
2561 # Decode one char at a time
2562 for c in s:
2563 result.append(decoder.decode(c))
Ezio Melotti2623a372010-11-21 13:34:58 +00002564 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002565 _decode_bytewise("abc\n\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002566 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002567 _decode_bytewise("\nabc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002568 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002569 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002570 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002571 _decode_bytewise("abc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002572 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002573 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002574 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002575 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002576 input = "abc"
2577 if encoder is not None:
2578 encoder.reset()
2579 input = encoder.encode(input)
Ezio Melotti2623a372010-11-21 13:34:58 +00002580 self.assertEqual(decoder.decode(input), "abc")
2581 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002582
2583 def test_newline_decoder(self):
2584 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002585 # None meaning the IncrementalNewlineDecoder takes unicode input
2586 # rather than bytes input
2587 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002588 'utf-16', 'utf-16-le', 'utf-16-be',
2589 'utf-32', 'utf-32-le', 'utf-32-be',
2590 )
2591 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002592 decoder = enc and codecs.getincrementaldecoder(enc)()
2593 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2594 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002595 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002596 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2597 self.check_newline_decoding_utf8(decoder)
2598
2599 def test_newline_bytes(self):
2600 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2601 def _check(dec):
Ezio Melotti2623a372010-11-21 13:34:58 +00002602 self.assertEqual(dec.newlines, None)
2603 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2604 self.assertEqual(dec.newlines, None)
2605 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2606 self.assertEqual(dec.newlines, None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002607 dec = self.IncrementalNewlineDecoder(None, translate=False)
2608 _check(dec)
2609 dec = self.IncrementalNewlineDecoder(None, translate=True)
2610 _check(dec)
2611
2612class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2613 pass
2614
2615class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2616 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002617
Christian Heimes1a6387e2008-03-26 12:49:49 +00002618
2619# XXX Tests for open()
2620
2621class MiscIOTest(unittest.TestCase):
2622
Benjamin Petersonad100c32008-11-20 22:06:22 +00002623 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002624 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002625
Antoine Pitrou19690592009-06-12 20:14:08 +00002626 def test___all__(self):
2627 for name in self.io.__all__:
2628 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002629 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002630 if name == "open":
2631 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002632 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002633 self.assertTrue(issubclass(obj, Exception), name)
2634 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002635 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002636
Benjamin Petersonad100c32008-11-20 22:06:22 +00002637 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002638 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002639 self.assertEqual(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002640 f.close()
2641
Antoine Pitrou19690592009-06-12 20:14:08 +00002642 f = self.open(support.TESTFN, "U")
Ezio Melotti2623a372010-11-21 13:34:58 +00002643 self.assertEqual(f.name, support.TESTFN)
2644 self.assertEqual(f.buffer.name, support.TESTFN)
2645 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2646 self.assertEqual(f.mode, "U")
2647 self.assertEqual(f.buffer.mode, "rb")
2648 self.assertEqual(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002649 f.close()
2650
Antoine Pitrou19690592009-06-12 20:14:08 +00002651 f = self.open(support.TESTFN, "w+")
Ezio Melotti2623a372010-11-21 13:34:58 +00002652 self.assertEqual(f.mode, "w+")
2653 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2654 self.assertEqual(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002655
Antoine Pitrou19690592009-06-12 20:14:08 +00002656 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti2623a372010-11-21 13:34:58 +00002657 self.assertEqual(g.mode, "wb")
2658 self.assertEqual(g.raw.mode, "wb")
2659 self.assertEqual(g.name, f.fileno())
2660 self.assertEqual(g.raw.name, f.fileno())
Benjamin Petersonad100c32008-11-20 22:06:22 +00002661 f.close()
2662 g.close()
2663
Antoine Pitrou19690592009-06-12 20:14:08 +00002664 def test_io_after_close(self):
2665 for kwargs in [
2666 {"mode": "w"},
2667 {"mode": "wb"},
2668 {"mode": "w", "buffering": 1},
2669 {"mode": "w", "buffering": 2},
2670 {"mode": "wb", "buffering": 0},
2671 {"mode": "r"},
2672 {"mode": "rb"},
2673 {"mode": "r", "buffering": 1},
2674 {"mode": "r", "buffering": 2},
2675 {"mode": "rb", "buffering": 0},
2676 {"mode": "w+"},
2677 {"mode": "w+b"},
2678 {"mode": "w+", "buffering": 1},
2679 {"mode": "w+", "buffering": 2},
2680 {"mode": "w+b", "buffering": 0},
2681 ]:
2682 f = self.open(support.TESTFN, **kwargs)
2683 f.close()
2684 self.assertRaises(ValueError, f.flush)
2685 self.assertRaises(ValueError, f.fileno)
2686 self.assertRaises(ValueError, f.isatty)
2687 self.assertRaises(ValueError, f.__iter__)
2688 if hasattr(f, "peek"):
2689 self.assertRaises(ValueError, f.peek, 1)
2690 self.assertRaises(ValueError, f.read)
2691 if hasattr(f, "read1"):
2692 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinner5100a402011-05-25 22:15:36 +02002693 if hasattr(f, "readall"):
2694 self.assertRaises(ValueError, f.readall)
Antoine Pitrou19690592009-06-12 20:14:08 +00002695 if hasattr(f, "readinto"):
2696 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2697 self.assertRaises(ValueError, f.readline)
2698 self.assertRaises(ValueError, f.readlines)
2699 self.assertRaises(ValueError, f.seek, 0)
2700 self.assertRaises(ValueError, f.tell)
2701 self.assertRaises(ValueError, f.truncate)
2702 self.assertRaises(ValueError, f.write,
2703 b"" if "b" in kwargs['mode'] else "")
2704 self.assertRaises(ValueError, f.writelines, [])
2705 self.assertRaises(ValueError, next, f)
2706
2707 def test_blockingioerror(self):
2708 # Various BlockingIOError issues
2709 self.assertRaises(TypeError, self.BlockingIOError)
2710 self.assertRaises(TypeError, self.BlockingIOError, 1)
2711 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2712 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2713 b = self.BlockingIOError(1, "")
2714 self.assertEqual(b.characters_written, 0)
2715 class C(unicode):
2716 pass
2717 c = C("")
2718 b = self.BlockingIOError(1, c)
2719 c.b = b
2720 b.c = c
2721 wr = weakref.ref(c)
2722 del c, b
2723 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002724 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002725
2726 def test_abcs(self):
2727 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002728 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2729 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2730 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2731 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002732
2733 def _check_abc_inheritance(self, abcmodule):
2734 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002735 self.assertIsInstance(f, abcmodule.IOBase)
2736 self.assertIsInstance(f, abcmodule.RawIOBase)
2737 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2738 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002739 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002740 self.assertIsInstance(f, abcmodule.IOBase)
2741 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2742 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2743 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002744 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002745 self.assertIsInstance(f, abcmodule.IOBase)
2746 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2747 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2748 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002749
2750 def test_abc_inheritance(self):
2751 # Test implementations inherit from their respective ABCs
2752 self._check_abc_inheritance(self)
2753
2754 def test_abc_inheritance_official(self):
2755 # Test implementations inherit from the official ABCs of the
2756 # baseline "io" module.
2757 self._check_abc_inheritance(io)
2758
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01002759 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2760 def test_nonblock_pipe_write_bigbuf(self):
2761 self._test_nonblock_pipe_write(16*1024)
2762
2763 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2764 def test_nonblock_pipe_write_smallbuf(self):
2765 self._test_nonblock_pipe_write(1024)
2766
2767 def _set_non_blocking(self, fd):
2768 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2769 self.assertNotEqual(flags, -1)
2770 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2771 self.assertEqual(res, 0)
2772
2773 def _test_nonblock_pipe_write(self, bufsize):
2774 sent = []
2775 received = []
2776 r, w = os.pipe()
2777 self._set_non_blocking(r)
2778 self._set_non_blocking(w)
2779
2780 # To exercise all code paths in the C implementation we need
2781 # to play with buffer sizes. For instance, if we choose a
2782 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2783 # then we will never get a partial write of the buffer.
2784 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2785 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2786
2787 with rf, wf:
2788 for N in 9999, 73, 7574:
2789 try:
2790 i = 0
2791 while True:
2792 msg = bytes([i % 26 + 97]) * N
2793 sent.append(msg)
2794 wf.write(msg)
2795 i += 1
2796
2797 except self.BlockingIOError as e:
2798 self.assertEqual(e.args[0], errno.EAGAIN)
2799 sent[-1] = sent[-1][:e.characters_written]
2800 received.append(rf.read())
2801 msg = b'BLOCKED'
2802 wf.write(msg)
2803 sent.append(msg)
2804
2805 while True:
2806 try:
2807 wf.flush()
2808 break
2809 except self.BlockingIOError as e:
2810 self.assertEqual(e.args[0], errno.EAGAIN)
2811 self.assertEqual(e.characters_written, 0)
2812 received.append(rf.read())
2813
2814 received += iter(rf.read, None)
2815
2816 sent, received = b''.join(sent), b''.join(received)
2817 self.assertTrue(sent == received)
2818 self.assertTrue(wf.closed)
2819 self.assertTrue(rf.closed)
2820
Antoine Pitrou19690592009-06-12 20:14:08 +00002821class CMiscIOTest(MiscIOTest):
2822 io = io
2823
2824class PyMiscIOTest(MiscIOTest):
2825 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002826
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002827
2828@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2829class SignalsTest(unittest.TestCase):
2830
2831 def setUp(self):
2832 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2833
2834 def tearDown(self):
2835 signal.signal(signal.SIGALRM, self.oldalrm)
2836
2837 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00002838 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002839
2840 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinner49d495f2011-07-04 11:44:46 +02002841 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
2842 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002843 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2844 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou6439c002011-02-25 21:35:47 +00002845 invokes the signal handler, and bubbles up the exception raised
2846 in the latter."""
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002847 read_results = []
2848 def _read():
2849 s = os.read(r, 1)
2850 read_results.append(s)
2851 t = threading.Thread(target=_read)
2852 t.daemon = True
2853 r, w = os.pipe()
2854 try:
2855 wio = self.io.open(w, **fdopen_kwargs)
2856 t.start()
2857 signal.alarm(1)
2858 # Fill the pipe enough that the write will be blocking.
2859 # It will be interrupted by the timer armed above. Since the
2860 # other thread has read one byte, the low-level write will
2861 # return with a successful (partial) result rather than an EINTR.
2862 # The buffered IO layer must check for pending signal
2863 # handlers, which in this case will invoke alarm_interrupt().
2864 self.assertRaises(ZeroDivisionError,
2865 wio.write, item * (1024 * 1024))
2866 t.join()
2867 # We got one byte, get another one and check that it isn't a
2868 # repeat of the first one.
2869 read_results.append(os.read(r, 1))
2870 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2871 finally:
2872 os.close(w)
2873 os.close(r)
2874 # This is deliberate. If we didn't close the file descriptor
2875 # before closing wio, wio would try to flush its internal
2876 # buffer, and block again.
2877 try:
2878 wio.close()
2879 except IOError as e:
2880 if e.errno != errno.EBADF:
2881 raise
2882
2883 def test_interrupted_write_unbuffered(self):
2884 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2885
2886 def test_interrupted_write_buffered(self):
2887 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2888
2889 def test_interrupted_write_text(self):
2890 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2891
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002892 def check_reentrant_write(self, data, **fdopen_kwargs):
2893 def on_alarm(*args):
2894 # Will be called reentrantly from the same thread
2895 wio.write(data)
Victor Stinner4c41f842011-07-05 13:29:26 +02002896 1//0
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002897 signal.signal(signal.SIGALRM, on_alarm)
2898 r, w = os.pipe()
2899 wio = self.io.open(w, **fdopen_kwargs)
2900 try:
2901 signal.alarm(1)
2902 # Either the reentrant call to wio.write() fails with RuntimeError,
2903 # or the signal handler raises ZeroDivisionError.
2904 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2905 while 1:
2906 for i in range(100):
2907 wio.write(data)
2908 wio.flush()
2909 # Make sure the buffer doesn't fill up and block further writes
2910 os.read(r, len(data) * 100)
2911 exc = cm.exception
2912 if isinstance(exc, RuntimeError):
2913 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2914 finally:
2915 wio.close()
2916 os.close(r)
2917
2918 def test_reentrant_write_buffered(self):
2919 self.check_reentrant_write(b"xy", mode="wb")
2920
2921 def test_reentrant_write_text(self):
2922 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2923
Antoine Pitrou6439c002011-02-25 21:35:47 +00002924 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2925 """Check that a buffered read, when it gets interrupted (either
2926 returning a partial result or EINTR), properly invokes the signal
2927 handler and retries if the latter returned successfully."""
2928 r, w = os.pipe()
2929 fdopen_kwargs["closefd"] = False
2930 def alarm_handler(sig, frame):
2931 os.write(w, b"bar")
2932 signal.signal(signal.SIGALRM, alarm_handler)
2933 try:
2934 rio = self.io.open(r, **fdopen_kwargs)
2935 os.write(w, b"foo")
2936 signal.alarm(1)
2937 # Expected behaviour:
2938 # - first raw read() returns partial b"foo"
2939 # - second raw read() returns EINTR
2940 # - third raw read() returns b"bar"
2941 self.assertEqual(decode(rio.read(6)), "foobar")
2942 finally:
2943 rio.close()
2944 os.close(w)
2945 os.close(r)
2946
2947 def test_interrupterd_read_retry_buffered(self):
2948 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2949 mode="rb")
2950
2951 def test_interrupterd_read_retry_text(self):
2952 self.check_interrupted_read_retry(lambda x: x,
2953 mode="r")
2954
2955 @unittest.skipUnless(threading, 'Threading required for this test.')
2956 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2957 """Check that a buffered write, when it gets interrupted (either
2958 returning a partial result or EINTR), properly invokes the signal
2959 handler and retries if the latter returned successfully."""
2960 select = support.import_module("select")
2961 # A quantity that exceeds the buffer size of an anonymous pipe's
2962 # write end.
2963 N = 1024 * 1024
2964 r, w = os.pipe()
2965 fdopen_kwargs["closefd"] = False
2966 # We need a separate thread to read from the pipe and allow the
2967 # write() to finish. This thread is started after the SIGALRM is
2968 # received (forcing a first EINTR in write()).
2969 read_results = []
2970 write_finished = False
2971 def _read():
2972 while not write_finished:
2973 while r in select.select([r], [], [], 1.0)[0]:
2974 s = os.read(r, 1024)
2975 read_results.append(s)
2976 t = threading.Thread(target=_read)
2977 t.daemon = True
2978 def alarm1(sig, frame):
2979 signal.signal(signal.SIGALRM, alarm2)
2980 signal.alarm(1)
2981 def alarm2(sig, frame):
2982 t.start()
2983 signal.signal(signal.SIGALRM, alarm1)
2984 try:
2985 wio = self.io.open(w, **fdopen_kwargs)
2986 signal.alarm(1)
2987 # Expected behaviour:
2988 # - first raw write() is partial (because of the limited pipe buffer
2989 # and the first alarm)
2990 # - second raw write() returns EINTR (because of the second alarm)
2991 # - subsequent write()s are successful (either partial or complete)
2992 self.assertEqual(N, wio.write(item * N))
2993 wio.flush()
2994 write_finished = True
2995 t.join()
2996 self.assertEqual(N, sum(len(x) for x in read_results))
2997 finally:
2998 write_finished = True
2999 os.close(w)
3000 os.close(r)
3001 # This is deliberate. If we didn't close the file descriptor
3002 # before closing wio, wio would try to flush its internal
3003 # buffer, and could block (in case of failure).
3004 try:
3005 wio.close()
3006 except IOError as e:
3007 if e.errno != errno.EBADF:
3008 raise
3009
3010 def test_interrupterd_write_retry_buffered(self):
3011 self.check_interrupted_write_retry(b"x", mode="wb")
3012
3013 def test_interrupterd_write_retry_text(self):
3014 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3015
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003016
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003017class CSignalsTest(SignalsTest):
3018 io = io
3019
3020class PySignalsTest(SignalsTest):
3021 io = pyio
3022
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003023 # Handling reentrancy issues would slow down _pyio even more, so the
3024 # tests are disabled.
3025 test_reentrant_write_buffered = None
3026 test_reentrant_write_text = None
3027
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003028
Christian Heimes1a6387e2008-03-26 12:49:49 +00003029def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00003030 tests = (CIOTest, PyIOTest,
3031 CBufferedReaderTest, PyBufferedReaderTest,
3032 CBufferedWriterTest, PyBufferedWriterTest,
3033 CBufferedRWPairTest, PyBufferedRWPairTest,
3034 CBufferedRandomTest, PyBufferedRandomTest,
3035 StatefulIncrementalDecoderTest,
3036 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3037 CTextIOWrapperTest, PyTextIOWrapperTest,
3038 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003039 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00003040 )
3041
3042 # Put the namespaces of the IO module we are testing and some useful mock
3043 # classes in the __dict__ of each test.
3044 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00003045 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00003046 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3047 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
3048 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
3049 globs = globals()
3050 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3051 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3052 # Avoid turning open into a bound method.
3053 py_io_ns["open"] = pyio.OpenWrapper
3054 for test in tests:
3055 if test.__name__.startswith("C"):
3056 for name, obj in c_io_ns.items():
3057 setattr(test, name, obj)
3058 elif test.__name__.startswith("Py"):
3059 for name, obj in py_io_ns.items():
3060 setattr(test, name, obj)
3061
3062 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00003063
3064if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00003065 test_main()