blob: 72feff59e4e0c082a656150eafe10151e683c627 [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Antoine Pitrou19690592009-06-12 20:14:08 +000032import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000033import signal
34import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000035from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000036from collections import deque
37from test import test_support as support
Christian Heimes1a6387e2008-03-26 12:49:49 +000038
39import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000040import io # C implementation of io
41import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000042try:
43 import threading
44except ImportError:
45 threading = None
Antoine Pitrou5aa7df32011-11-21 20:16:44 +010046try:
47 import fcntl
48except ImportError:
49 fcntl = None
Antoine Pitrou19690592009-06-12 20:14:08 +000050
51__metaclass__ = type
52bytes = support.py3k_bytes
53
54def _default_chunk_size():
55 """Get the default TextIOWrapper chunk size"""
56 with io.open(__file__, "r", encoding="latin1") as f:
57 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000058
59
Antoine Pitrou6391b342010-09-14 18:48:19 +000060class MockRawIOWithoutRead:
61 """A RawIO implementation without read(), so as to exercise the default
62 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000063
64 def __init__(self, read_stack=()):
65 self._read_stack = list(read_stack)
66 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000067 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000068 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000069
Christian Heimes1a6387e2008-03-26 12:49:49 +000070 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000071 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000072 return len(b)
73
74 def writable(self):
75 return True
76
77 def fileno(self):
78 return 42
79
80 def readable(self):
81 return True
82
83 def seekable(self):
84 return True
85
86 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000087 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000088
89 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000090 return 0 # same comment as above
91
92 def readinto(self, buf):
93 self._reads += 1
94 max_len = len(buf)
95 try:
96 data = self._read_stack[0]
97 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000098 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +000099 return 0
100 if data is None:
101 del self._read_stack[0]
102 return None
103 n = len(data)
104 if len(data) <= max_len:
105 del self._read_stack[0]
106 buf[:n] = data
107 return n
108 else:
109 buf[:] = data[:max_len]
110 self._read_stack[0] = data[max_len:]
111 return max_len
112
113 def truncate(self, pos=None):
114 return pos
115
Antoine Pitrou6391b342010-09-14 18:48:19 +0000116class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
117 pass
118
119class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
120 pass
121
122
123class MockRawIO(MockRawIOWithoutRead):
124
125 def read(self, n=None):
126 self._reads += 1
127 try:
128 return self._read_stack.pop(0)
129 except:
130 self._extraneous_reads += 1
131 return b""
132
Antoine Pitrou19690592009-06-12 20:14:08 +0000133class CMockRawIO(MockRawIO, io.RawIOBase):
134 pass
135
136class PyMockRawIO(MockRawIO, pyio.RawIOBase):
137 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000138
139
Antoine Pitrou19690592009-06-12 20:14:08 +0000140class MisbehavedRawIO(MockRawIO):
141 def write(self, b):
142 return MockRawIO.write(self, b) * 2
143
144 def read(self, n=None):
145 return MockRawIO.read(self, n) * 2
146
147 def seek(self, pos, whence):
148 return -123
149
150 def tell(self):
151 return -456
152
153 def readinto(self, buf):
154 MockRawIO.readinto(self, buf)
155 return len(buf) * 5
156
157class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
158 pass
159
160class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
161 pass
162
163
164class CloseFailureIO(MockRawIO):
165 closed = 0
166
167 def close(self):
168 if not self.closed:
169 self.closed = 1
170 raise IOError
171
172class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
173 pass
174
175class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
176 pass
177
178
179class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000180
181 def __init__(self, data):
182 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000183 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000184
185 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000186 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000187 self.read_history.append(None if res is None else len(res))
188 return res
189
Antoine Pitrou19690592009-06-12 20:14:08 +0000190 def readinto(self, b):
191 res = super(MockFileIO, self).readinto(b)
192 self.read_history.append(res)
193 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000194
Antoine Pitrou19690592009-06-12 20:14:08 +0000195class CMockFileIO(MockFileIO, io.BytesIO):
196 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000197
Antoine Pitrou19690592009-06-12 20:14:08 +0000198class PyMockFileIO(MockFileIO, pyio.BytesIO):
199 pass
200
201
202class MockNonBlockWriterIO:
203
204 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000205 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000206 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000207
Antoine Pitrou19690592009-06-12 20:14:08 +0000208 def pop_written(self):
209 s = b"".join(self._write_stack)
210 self._write_stack[:] = []
211 return s
212
213 def block_on(self, char):
214 """Block when a given char is encountered."""
215 self._blocker_char = char
216
217 def readable(self):
218 return True
219
220 def seekable(self):
221 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000222
223 def writable(self):
224 return True
225
Antoine Pitrou19690592009-06-12 20:14:08 +0000226 def write(self, b):
227 b = bytes(b)
228 n = -1
229 if self._blocker_char:
230 try:
231 n = b.index(self._blocker_char)
232 except ValueError:
233 pass
234 else:
Antoine Pitrou5aa7df32011-11-21 20:16:44 +0100235 if n > 0:
236 # write data up to the first blocker
237 self._write_stack.append(b[:n])
238 return n
239 else:
240 # cancel blocker and indicate would block
241 self._blocker_char = None
242 return None
Antoine Pitrou19690592009-06-12 20:14:08 +0000243 self._write_stack.append(b)
244 return len(b)
245
246class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
247 BlockingIOError = io.BlockingIOError
248
249class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
250 BlockingIOError = pyio.BlockingIOError
251
Christian Heimes1a6387e2008-03-26 12:49:49 +0000252
253class IOTest(unittest.TestCase):
254
Antoine Pitrou19690592009-06-12 20:14:08 +0000255 def setUp(self):
256 support.unlink(support.TESTFN)
257
Christian Heimes1a6387e2008-03-26 12:49:49 +0000258 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000259 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000260
261 def write_ops(self, f):
262 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000263 f.truncate(0)
264 self.assertEqual(f.tell(), 5)
265 f.seek(0)
266
267 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000268 self.assertEqual(f.seek(0), 0)
269 self.assertEqual(f.write(b"Hello."), 6)
270 self.assertEqual(f.tell(), 6)
271 self.assertEqual(f.seek(-1, 1), 5)
272 self.assertEqual(f.tell(), 5)
273 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
274 self.assertEqual(f.seek(0), 0)
275 self.assertEqual(f.write(b"h"), 1)
276 self.assertEqual(f.seek(-1, 2), 13)
277 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000278
Christian Heimes1a6387e2008-03-26 12:49:49 +0000279 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000280 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000281 self.assertRaises(TypeError, f.seek, 0.0)
282
283 def read_ops(self, f, buffered=False):
284 data = f.read(5)
285 self.assertEqual(data, b"hello")
286 data = bytearray(data)
287 self.assertEqual(f.readinto(data), 5)
288 self.assertEqual(data, b" worl")
289 self.assertEqual(f.readinto(data), 2)
290 self.assertEqual(len(data), 5)
291 self.assertEqual(data[:2], b"d\n")
292 self.assertEqual(f.seek(0), 0)
293 self.assertEqual(f.read(20), b"hello world\n")
294 self.assertEqual(f.read(1), b"")
295 self.assertEqual(f.readinto(bytearray(b"x")), 0)
296 self.assertEqual(f.seek(-6, 2), 6)
297 self.assertEqual(f.read(5), b"world")
298 self.assertEqual(f.read(0), b"")
299 self.assertEqual(f.readinto(bytearray()), 0)
300 self.assertEqual(f.seek(-6, 1), 5)
301 self.assertEqual(f.read(5), b" worl")
302 self.assertEqual(f.tell(), 10)
303 self.assertRaises(TypeError, f.seek, 0.0)
304 if buffered:
305 f.seek(0)
306 self.assertEqual(f.read(), b"hello world\n")
307 f.seek(6)
308 self.assertEqual(f.read(), b"world\n")
309 self.assertEqual(f.read(), b"")
310
311 LARGE = 2**31
312
313 def large_file_ops(self, f):
314 assert f.readable()
315 assert f.writable()
316 self.assertEqual(f.seek(self.LARGE), self.LARGE)
317 self.assertEqual(f.tell(), self.LARGE)
318 self.assertEqual(f.write(b"xxx"), 3)
319 self.assertEqual(f.tell(), self.LARGE + 3)
320 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
321 self.assertEqual(f.truncate(), self.LARGE + 2)
322 self.assertEqual(f.tell(), self.LARGE + 2)
323 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
324 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000325 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000326 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
327 self.assertEqual(f.seek(-1, 2), self.LARGE)
328 self.assertEqual(f.read(2), b"x")
329
Antoine Pitrou19690592009-06-12 20:14:08 +0000330 def test_invalid_operations(self):
331 # Try writing on a file opened in read mode and vice-versa.
332 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000333 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000334 self.assertRaises(IOError, fp.read)
335 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000336 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000337 self.assertRaises(IOError, fp.write, b"blah")
338 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000339 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000340 self.assertRaises(IOError, fp.write, "blah")
341 self.assertRaises(IOError, fp.writelines, ["blah\n"])
342
Christian Heimes1a6387e2008-03-26 12:49:49 +0000343 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000344 with self.open(support.TESTFN, "wb", buffering=0) as f:
345 self.assertEqual(f.readable(), False)
346 self.assertEqual(f.writable(), True)
347 self.assertEqual(f.seekable(), True)
348 self.write_ops(f)
349 with self.open(support.TESTFN, "rb", buffering=0) as f:
350 self.assertEqual(f.readable(), True)
351 self.assertEqual(f.writable(), False)
352 self.assertEqual(f.seekable(), True)
353 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000354
355 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000356 with self.open(support.TESTFN, "wb") as f:
357 self.assertEqual(f.readable(), False)
358 self.assertEqual(f.writable(), True)
359 self.assertEqual(f.seekable(), True)
360 self.write_ops(f)
361 with self.open(support.TESTFN, "rb") as f:
362 self.assertEqual(f.readable(), True)
363 self.assertEqual(f.writable(), False)
364 self.assertEqual(f.seekable(), True)
365 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000366
367 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000368 with self.open(support.TESTFN, "wb") as f:
369 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
370 with self.open(support.TESTFN, "rb") as f:
371 self.assertEqual(f.readline(), b"abc\n")
372 self.assertEqual(f.readline(10), b"def\n")
373 self.assertEqual(f.readline(2), b"xy")
374 self.assertEqual(f.readline(4), b"zzy\n")
375 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000376 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000377 self.assertRaises(TypeError, f.readline, 5.3)
378 with self.open(support.TESTFN, "r") as f:
379 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000380
381 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000382 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000383 self.write_ops(f)
384 data = f.getvalue()
385 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000386 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000387 self.read_ops(f, True)
388
389 def test_large_file_ops(self):
390 # On Windows and Mac OSX this test comsumes large resources; It takes
391 # a long time to build the >2GB file and takes >2GB of disk space
392 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000393 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
394 if not support.is_resource_enabled("largefile"):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000395 print("\nTesting large file ops skipped on %s." % sys.platform,
396 file=sys.stderr)
397 print("It requires %d bytes and a long time." % self.LARGE,
398 file=sys.stderr)
399 print("Use 'regrtest.py -u largefile test_io' to run it.",
400 file=sys.stderr)
401 return
Antoine Pitrou19690592009-06-12 20:14:08 +0000402 with self.open(support.TESTFN, "w+b", 0) as f:
403 self.large_file_ops(f)
404 with self.open(support.TESTFN, "w+b") as f:
405 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000406
407 def test_with_open(self):
408 for bufsize in (0, 1, 100):
409 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000410 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000411 f.write(b"xxx")
412 self.assertEqual(f.closed, True)
413 f = None
414 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000415 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000416 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000417 except ZeroDivisionError:
418 self.assertEqual(f.closed, True)
419 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000420 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000421
Antoine Pitroue741cc62009-01-21 00:45:36 +0000422 # issue 5008
423 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000424 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000425 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000426 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000427 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000428 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000429 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000430 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000431 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000432
Christian Heimes1a6387e2008-03-26 12:49:49 +0000433 def test_destructor(self):
434 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000435 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000436 def __del__(self):
437 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000438 try:
439 f = super(MyFileIO, self).__del__
440 except AttributeError:
441 pass
442 else:
443 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000444 def close(self):
445 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000446 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000447 def flush(self):
448 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000449 super(MyFileIO, self).flush()
450 f = MyFileIO(support.TESTFN, "wb")
451 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000452 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000453 support.gc_collect()
454 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000455 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000456 self.assertEqual(f.read(), b"xxx")
457
458 def _check_base_destructor(self, base):
459 record = []
460 class MyIO(base):
461 def __init__(self):
462 # This exercises the availability of attributes on object
463 # destruction.
464 # (in the C version, close() is called by the tp_dealloc
465 # function, not by __del__)
466 self.on_del = 1
467 self.on_close = 2
468 self.on_flush = 3
469 def __del__(self):
470 record.append(self.on_del)
471 try:
472 f = super(MyIO, self).__del__
473 except AttributeError:
474 pass
475 else:
476 f()
477 def close(self):
478 record.append(self.on_close)
479 super(MyIO, self).close()
480 def flush(self):
481 record.append(self.on_flush)
482 super(MyIO, self).flush()
483 f = MyIO()
484 del f
485 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000486 self.assertEqual(record, [1, 2, 3])
487
Antoine Pitrou19690592009-06-12 20:14:08 +0000488 def test_IOBase_destructor(self):
489 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000490
Antoine Pitrou19690592009-06-12 20:14:08 +0000491 def test_RawIOBase_destructor(self):
492 self._check_base_destructor(self.RawIOBase)
493
494 def test_BufferedIOBase_destructor(self):
495 self._check_base_destructor(self.BufferedIOBase)
496
497 def test_TextIOBase_destructor(self):
498 self._check_base_destructor(self.TextIOBase)
499
500 def test_close_flushes(self):
501 with self.open(support.TESTFN, "wb") as f:
502 f.write(b"xxx")
503 with self.open(support.TESTFN, "rb") as f:
504 self.assertEqual(f.read(), b"xxx")
505
506 def test_array_writes(self):
507 a = array.array(b'i', range(10))
508 n = len(a.tostring())
509 with self.open(support.TESTFN, "wb", 0) as f:
510 self.assertEqual(f.write(a), n)
511 with self.open(support.TESTFN, "wb") as f:
512 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000513
514 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000515 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000516 closefd=False)
517
Antoine Pitrou19690592009-06-12 20:14:08 +0000518 def test_read_closed(self):
519 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000520 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000521 with self.open(support.TESTFN, "r") as f:
522 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000523 self.assertEqual(file.read(), "egg\n")
524 file.seek(0)
525 file.close()
526 self.assertRaises(ValueError, file.read)
527
528 def test_no_closefd_with_filename(self):
529 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000530 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000531
532 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000533 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000534 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000535 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000536 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000537 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000538 self.assertEqual(file.buffer.raw.closefd, False)
539
Antoine Pitrou19690592009-06-12 20:14:08 +0000540 def test_garbage_collection(self):
541 # FileIO objects are collected, and collecting them flushes
542 # all data to disk.
543 f = self.FileIO(support.TESTFN, "wb")
544 f.write(b"abcxxx")
545 f.f = f
546 wr = weakref.ref(f)
547 del f
548 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000549 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000550 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000551 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000552
Antoine Pitrou19690592009-06-12 20:14:08 +0000553 def test_unbounded_file(self):
554 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
555 zero = "/dev/zero"
556 if not os.path.exists(zero):
557 self.skipTest("{0} does not exist".format(zero))
558 if sys.maxsize > 0x7FFFFFFF:
559 self.skipTest("test can only run in a 32-bit address space")
560 if support.real_max_memuse < support._2G:
561 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000562 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000563 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000564 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000565 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000566 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000567 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000568
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000569 def test_flush_error_on_close(self):
570 f = self.open(support.TESTFN, "wb", buffering=0)
571 def bad_flush():
572 raise IOError()
573 f.flush = bad_flush
574 self.assertRaises(IOError, f.close) # exception not swallowed
575
576 def test_multi_close(self):
577 f = self.open(support.TESTFN, "wb", buffering=0)
578 f.close()
579 f.close()
580 f.close()
581 self.assertRaises(ValueError, f.flush)
582
Antoine Pitrou6391b342010-09-14 18:48:19 +0000583 def test_RawIOBase_read(self):
584 # Exercise the default RawIOBase.read() implementation (which calls
585 # readinto() internally).
586 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
587 self.assertEqual(rawio.read(2), b"ab")
588 self.assertEqual(rawio.read(2), b"c")
589 self.assertEqual(rawio.read(2), b"d")
590 self.assertEqual(rawio.read(2), None)
591 self.assertEqual(rawio.read(2), b"ef")
592 self.assertEqual(rawio.read(2), b"g")
593 self.assertEqual(rawio.read(2), None)
594 self.assertEqual(rawio.read(2), b"")
595
Hynek Schlawack877effc2012-05-25 09:24:18 +0200596 def test_fileio_closefd(self):
597 # Issue #4841
598 with self.open(__file__, 'rb') as f1, \
599 self.open(__file__, 'rb') as f2:
600 fileio = self.FileIO(f1.fileno(), closefd=False)
601 # .__init__() must not close f1
602 fileio.__init__(f2.fileno(), closefd=False)
603 f1.readline()
604 # .close() must not close f2
605 fileio.close()
606 f2.readline()
607
608
Antoine Pitrou19690592009-06-12 20:14:08 +0000609class CIOTest(IOTest):
Antoine Pitrou16166452011-07-12 22:04:20 +0200610
611 def test_IOBase_finalize(self):
612 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
613 # class which inherits IOBase and an object of this class are caught
614 # in a reference cycle and close() is already in the method cache.
615 class MyIO(self.IOBase):
616 def close(self):
617 pass
618
619 # create an instance to populate the method cache
620 MyIO()
621 obj = MyIO()
622 obj.obj = obj
623 wr = weakref.ref(obj)
624 del MyIO
625 del obj
626 support.gc_collect()
627 self.assertTrue(wr() is None, wr)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000628
Antoine Pitrou19690592009-06-12 20:14:08 +0000629class PyIOTest(IOTest):
630 test_array_writes = unittest.skip(
631 "len(array.array) returns number of elements rather than bytelength"
632 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000633
634
Antoine Pitrou19690592009-06-12 20:14:08 +0000635class CommonBufferedTests:
636 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
637
638 def test_detach(self):
639 raw = self.MockRawIO()
640 buf = self.tp(raw)
641 self.assertIs(buf.detach(), raw)
642 self.assertRaises(ValueError, buf.detach)
643
644 def test_fileno(self):
645 rawio = self.MockRawIO()
646 bufio = self.tp(rawio)
647
Ezio Melotti2623a372010-11-21 13:34:58 +0000648 self.assertEqual(42, bufio.fileno())
Antoine Pitrou19690592009-06-12 20:14:08 +0000649
650 def test_no_fileno(self):
651 # XXX will we always have fileno() function? If so, kill
652 # this test. Else, write it.
653 pass
654
655 def test_invalid_args(self):
656 rawio = self.MockRawIO()
657 bufio = self.tp(rawio)
658 # Invalid whence
659 self.assertRaises(ValueError, bufio.seek, 0, -1)
660 self.assertRaises(ValueError, bufio.seek, 0, 3)
661
662 def test_override_destructor(self):
663 tp = self.tp
664 record = []
665 class MyBufferedIO(tp):
666 def __del__(self):
667 record.append(1)
668 try:
669 f = super(MyBufferedIO, self).__del__
670 except AttributeError:
671 pass
672 else:
673 f()
674 def close(self):
675 record.append(2)
676 super(MyBufferedIO, self).close()
677 def flush(self):
678 record.append(3)
679 super(MyBufferedIO, self).flush()
680 rawio = self.MockRawIO()
681 bufio = MyBufferedIO(rawio)
682 writable = bufio.writable()
683 del bufio
684 support.gc_collect()
685 if writable:
686 self.assertEqual(record, [1, 2, 3])
687 else:
688 self.assertEqual(record, [1, 2])
689
690 def test_context_manager(self):
691 # Test usability as a context manager
692 rawio = self.MockRawIO()
693 bufio = self.tp(rawio)
694 def _with():
695 with bufio:
696 pass
697 _with()
698 # bufio should now be closed, and using it a second time should raise
699 # a ValueError.
700 self.assertRaises(ValueError, _with)
701
702 def test_error_through_destructor(self):
703 # Test that the exception state is not modified by a destructor,
704 # even if close() fails.
705 rawio = self.CloseFailureIO()
706 def f():
707 self.tp(rawio).xyzzy
708 with support.captured_output("stderr") as s:
709 self.assertRaises(AttributeError, f)
710 s = s.getvalue().strip()
711 if s:
712 # The destructor *may* have printed an unraisable error, check it
713 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000714 self.assertTrue(s.startswith("Exception IOError: "), s)
715 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000716
717 def test_repr(self):
718 raw = self.MockRawIO()
719 b = self.tp(raw)
720 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
721 self.assertEqual(repr(b), "<%s>" % clsname)
722 raw.name = "dummy"
723 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
724 raw.name = b"dummy"
725 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000726
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000727 def test_flush_error_on_close(self):
728 raw = self.MockRawIO()
729 def bad_flush():
730 raise IOError()
731 raw.flush = bad_flush
732 b = self.tp(raw)
733 self.assertRaises(IOError, b.close) # exception not swallowed
734
735 def test_multi_close(self):
736 raw = self.MockRawIO()
737 b = self.tp(raw)
738 b.close()
739 b.close()
740 b.close()
741 self.assertRaises(ValueError, b.flush)
742
Antoine Pitroufc9ead62010-12-21 21:26:55 +0000743 def test_readonly_attributes(self):
744 raw = self.MockRawIO()
745 buf = self.tp(raw)
746 x = self.MockRawIO()
747 with self.assertRaises((AttributeError, TypeError)):
748 buf.raw = x
749
Christian Heimes1a6387e2008-03-26 12:49:49 +0000750
Antoine Pitrou19690592009-06-12 20:14:08 +0000751class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
752 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000753
Antoine Pitrou19690592009-06-12 20:14:08 +0000754 def test_constructor(self):
755 rawio = self.MockRawIO([b"abc"])
756 bufio = self.tp(rawio)
757 bufio.__init__(rawio)
758 bufio.__init__(rawio, buffer_size=1024)
759 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000760 self.assertEqual(b"abc", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000761 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
762 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
763 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
764 rawio = self.MockRawIO([b"abc"])
765 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000766 self.assertEqual(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000767
Antoine Pitrou19690592009-06-12 20:14:08 +0000768 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000769 for arg in (None, 7):
770 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
771 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000772 self.assertEqual(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000773 # Invalid args
774 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000775
Antoine Pitrou19690592009-06-12 20:14:08 +0000776 def test_read1(self):
777 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
778 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000779 self.assertEqual(b"a", bufio.read(1))
780 self.assertEqual(b"b", bufio.read1(1))
781 self.assertEqual(rawio._reads, 1)
782 self.assertEqual(b"c", bufio.read1(100))
783 self.assertEqual(rawio._reads, 1)
784 self.assertEqual(b"d", bufio.read1(100))
785 self.assertEqual(rawio._reads, 2)
786 self.assertEqual(b"efg", bufio.read1(100))
787 self.assertEqual(rawio._reads, 3)
788 self.assertEqual(b"", bufio.read1(100))
789 self.assertEqual(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000790 # Invalid args
791 self.assertRaises(ValueError, bufio.read1, -1)
792
793 def test_readinto(self):
794 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
795 bufio = self.tp(rawio)
796 b = bytearray(2)
Ezio Melotti2623a372010-11-21 13:34:58 +0000797 self.assertEqual(bufio.readinto(b), 2)
798 self.assertEqual(b, b"ab")
799 self.assertEqual(bufio.readinto(b), 2)
800 self.assertEqual(b, b"cd")
801 self.assertEqual(bufio.readinto(b), 2)
802 self.assertEqual(b, b"ef")
803 self.assertEqual(bufio.readinto(b), 1)
804 self.assertEqual(b, b"gf")
805 self.assertEqual(bufio.readinto(b), 0)
806 self.assertEqual(b, b"gf")
Antoine Pitrou19690592009-06-12 20:14:08 +0000807
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000808 def test_readlines(self):
809 def bufio():
810 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
811 return self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000812 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
813 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
814 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000815
Antoine Pitrou19690592009-06-12 20:14:08 +0000816 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000817 data = b"abcdefghi"
818 dlen = len(data)
819
820 tests = [
821 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
822 [ 100, [ 3, 3, 3], [ dlen ] ],
823 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
824 ]
825
826 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000827 rawio = self.MockFileIO(data)
828 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000829 pos = 0
830 for nbytes in buf_read_sizes:
Ezio Melotti2623a372010-11-21 13:34:58 +0000831 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Christian Heimes1a6387e2008-03-26 12:49:49 +0000832 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000833 # this is mildly implementation-dependent
Ezio Melotti2623a372010-11-21 13:34:58 +0000834 self.assertEqual(rawio.read_history, raw_read_sizes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000835
Antoine Pitrou19690592009-06-12 20:14:08 +0000836 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000837 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000838 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
839 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000840 self.assertEqual(b"abcd", bufio.read(6))
841 self.assertEqual(b"e", bufio.read(1))
842 self.assertEqual(b"fg", bufio.read())
843 self.assertEqual(b"", bufio.peek(1))
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200844 self.assertIsNone(bufio.read())
Ezio Melotti2623a372010-11-21 13:34:58 +0000845 self.assertEqual(b"", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000846
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200847 rawio = self.MockRawIO((b"a", None, None))
848 self.assertEqual(b"a", rawio.readall())
849 self.assertIsNone(rawio.readall())
850
Antoine Pitrou19690592009-06-12 20:14:08 +0000851 def test_read_past_eof(self):
852 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
853 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000854
Ezio Melotti2623a372010-11-21 13:34:58 +0000855 self.assertEqual(b"abcdefg", bufio.read(9000))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000856
Antoine Pitrou19690592009-06-12 20:14:08 +0000857 def test_read_all(self):
858 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
859 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000860
Ezio Melotti2623a372010-11-21 13:34:58 +0000861 self.assertEqual(b"abcdefg", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000862
Victor Stinner6a102812010-04-27 23:55:59 +0000863 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +0000864 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +0000865 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000866 try:
867 # Write out many bytes with exactly the same number of 0's,
868 # 1's... 255's. This will help us check that concurrent reading
869 # doesn't duplicate or forget contents.
870 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000871 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000872 random.shuffle(l)
873 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000874 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000875 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000876 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000877 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000878 errors = []
879 results = []
880 def f():
881 try:
882 # Intra-buffer read then buffer-flushing read
883 for n in cycle([1, 19]):
884 s = bufio.read(n)
885 if not s:
886 break
887 # list.append() is atomic
888 results.append(s)
889 except Exception as e:
890 errors.append(e)
891 raise
892 threads = [threading.Thread(target=f) for x in range(20)]
893 for t in threads:
894 t.start()
895 time.sleep(0.02) # yield
896 for t in threads:
897 t.join()
898 self.assertFalse(errors,
899 "the following exceptions were caught: %r" % errors)
900 s = b''.join(results)
901 for i in range(256):
902 c = bytes(bytearray([i]))
903 self.assertEqual(s.count(c), N)
904 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000905 support.unlink(support.TESTFN)
906
907 def test_misbehaved_io(self):
908 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
909 bufio = self.tp(rawio)
910 self.assertRaises(IOError, bufio.seek, 0)
911 self.assertRaises(IOError, bufio.tell)
912
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000913 def test_no_extraneous_read(self):
914 # Issue #9550; when the raw IO object has satisfied the read request,
915 # we should not issue any additional reads, otherwise it may block
916 # (e.g. socket).
917 bufsize = 16
918 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
919 rawio = self.MockRawIO([b"x" * n])
920 bufio = self.tp(rawio, bufsize)
921 self.assertEqual(bufio.read(n), b"x" * n)
922 # Simple case: one raw read is enough to satisfy the request.
923 self.assertEqual(rawio._extraneous_reads, 0,
924 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
925 # A more complex case where two raw reads are needed to satisfy
926 # the request.
927 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
928 bufio = self.tp(rawio, bufsize)
929 self.assertEqual(bufio.read(n), b"x" * n)
930 self.assertEqual(rawio._extraneous_reads, 0,
931 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
932
933
Antoine Pitrou19690592009-06-12 20:14:08 +0000934class CBufferedReaderTest(BufferedReaderTest):
935 tp = io.BufferedReader
936
937 def test_constructor(self):
938 BufferedReaderTest.test_constructor(self)
939 # The allocation can succeed on 32-bit builds, e.g. with more
940 # than 2GB RAM and a 64-bit kernel.
941 if sys.maxsize > 0x7FFFFFFF:
942 rawio = self.MockRawIO()
943 bufio = self.tp(rawio)
944 self.assertRaises((OverflowError, MemoryError, ValueError),
945 bufio.__init__, rawio, sys.maxsize)
946
947 def test_initialization(self):
948 rawio = self.MockRawIO([b"abc"])
949 bufio = self.tp(rawio)
950 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
951 self.assertRaises(ValueError, bufio.read)
952 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
953 self.assertRaises(ValueError, bufio.read)
954 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
955 self.assertRaises(ValueError, bufio.read)
956
957 def test_misbehaved_io_read(self):
958 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
959 bufio = self.tp(rawio)
960 # _pyio.BufferedReader seems to implement reading different, so that
961 # checking this is not so easy.
962 self.assertRaises(IOError, bufio.read, 10)
963
964 def test_garbage_collection(self):
965 # C BufferedReader objects are collected.
966 # The Python version has __del__, so it ends into gc.garbage instead
967 rawio = self.FileIO(support.TESTFN, "w+b")
968 f = self.tp(rawio)
969 f.f = f
970 wr = weakref.ref(f)
971 del f
972 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000973 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +0000974
975class PyBufferedReaderTest(BufferedReaderTest):
976 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000977
978
Antoine Pitrou19690592009-06-12 20:14:08 +0000979class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
980 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000981
Antoine Pitrou19690592009-06-12 20:14:08 +0000982 def test_constructor(self):
983 rawio = self.MockRawIO()
984 bufio = self.tp(rawio)
985 bufio.__init__(rawio)
986 bufio.__init__(rawio, buffer_size=1024)
987 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000988 self.assertEqual(3, bufio.write(b"abc"))
Antoine Pitrou19690592009-06-12 20:14:08 +0000989 bufio.flush()
990 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
991 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
992 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
993 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000994 self.assertEqual(3, bufio.write(b"ghi"))
Antoine Pitrou19690592009-06-12 20:14:08 +0000995 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +0000996 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000997
Antoine Pitrou19690592009-06-12 20:14:08 +0000998 def test_detach_flush(self):
999 raw = self.MockRawIO()
1000 buf = self.tp(raw)
1001 buf.write(b"howdy!")
1002 self.assertFalse(raw._write_stack)
1003 buf.detach()
1004 self.assertEqual(raw._write_stack, [b"howdy!"])
1005
1006 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001007 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +00001008 writer = self.MockRawIO()
1009 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001010 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001011 self.assertFalse(writer._write_stack)
1012
Antoine Pitrou19690592009-06-12 20:14:08 +00001013 def test_write_overflow(self):
1014 writer = self.MockRawIO()
1015 bufio = self.tp(writer, 8)
1016 contents = b"abcdefghijklmnop"
1017 for n in range(0, len(contents), 3):
1018 bufio.write(contents[n:n+3])
1019 flushed = b"".join(writer._write_stack)
1020 # At least (total - 8) bytes were implicitly flushed, perhaps more
1021 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001022 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001023
Antoine Pitrou19690592009-06-12 20:14:08 +00001024 def check_writes(self, intermediate_func):
1025 # Lots of writes, test the flushed output is as expected.
1026 contents = bytes(range(256)) * 1000
1027 n = 0
1028 writer = self.MockRawIO()
1029 bufio = self.tp(writer, 13)
1030 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1031 def gen_sizes():
1032 for size in count(1):
1033 for i in range(15):
1034 yield size
1035 sizes = gen_sizes()
1036 while n < len(contents):
1037 size = min(next(sizes), len(contents) - n)
Ezio Melotti2623a372010-11-21 13:34:58 +00001038 self.assertEqual(bufio.write(contents[n:n+size]), size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001039 intermediate_func(bufio)
1040 n += size
1041 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001042 self.assertEqual(contents,
Antoine Pitrou19690592009-06-12 20:14:08 +00001043 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001044
Antoine Pitrou19690592009-06-12 20:14:08 +00001045 def test_writes(self):
1046 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001047
Antoine Pitrou19690592009-06-12 20:14:08 +00001048 def test_writes_and_flushes(self):
1049 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001050
Antoine Pitrou19690592009-06-12 20:14:08 +00001051 def test_writes_and_seeks(self):
1052 def _seekabs(bufio):
1053 pos = bufio.tell()
1054 bufio.seek(pos + 1, 0)
1055 bufio.seek(pos - 1, 0)
1056 bufio.seek(pos, 0)
1057 self.check_writes(_seekabs)
1058 def _seekrel(bufio):
1059 pos = bufio.seek(0, 1)
1060 bufio.seek(+1, 1)
1061 bufio.seek(-1, 1)
1062 bufio.seek(pos, 0)
1063 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001064
Antoine Pitrou19690592009-06-12 20:14:08 +00001065 def test_writes_and_truncates(self):
1066 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001067
Antoine Pitrou19690592009-06-12 20:14:08 +00001068 def test_write_non_blocking(self):
1069 raw = self.MockNonBlockWriterIO()
1070 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001071
Ezio Melotti2623a372010-11-21 13:34:58 +00001072 self.assertEqual(bufio.write(b"abcd"), 4)
1073 self.assertEqual(bufio.write(b"efghi"), 5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001074 # 1 byte will be written, the rest will be buffered
1075 raw.block_on(b"k")
Ezio Melotti2623a372010-11-21 13:34:58 +00001076 self.assertEqual(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001077
Antoine Pitrou19690592009-06-12 20:14:08 +00001078 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1079 raw.block_on(b"0")
1080 try:
1081 bufio.write(b"opqrwxyz0123456789")
1082 except self.BlockingIOError as e:
1083 written = e.characters_written
1084 else:
1085 self.fail("BlockingIOError should have been raised")
Ezio Melotti2623a372010-11-21 13:34:58 +00001086 self.assertEqual(written, 16)
1087 self.assertEqual(raw.pop_written(),
Antoine Pitrou19690592009-06-12 20:14:08 +00001088 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001089
Ezio Melotti2623a372010-11-21 13:34:58 +00001090 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Antoine Pitrou19690592009-06-12 20:14:08 +00001091 s = raw.pop_written()
1092 # Previously buffered bytes were flushed
1093 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001094
Antoine Pitrou19690592009-06-12 20:14:08 +00001095 def test_write_and_rewind(self):
1096 raw = io.BytesIO()
1097 bufio = self.tp(raw, 4)
1098 self.assertEqual(bufio.write(b"abcdef"), 6)
1099 self.assertEqual(bufio.tell(), 6)
1100 bufio.seek(0, 0)
1101 self.assertEqual(bufio.write(b"XY"), 2)
1102 bufio.seek(6, 0)
1103 self.assertEqual(raw.getvalue(), b"XYcdef")
1104 self.assertEqual(bufio.write(b"123456"), 6)
1105 bufio.flush()
1106 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001107
Antoine Pitrou19690592009-06-12 20:14:08 +00001108 def test_flush(self):
1109 writer = self.MockRawIO()
1110 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001111 bufio.write(b"abc")
1112 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001113 self.assertEqual(b"abc", writer._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001114
Antoine Pitrou19690592009-06-12 20:14:08 +00001115 def test_destructor(self):
1116 writer = self.MockRawIO()
1117 bufio = self.tp(writer, 8)
1118 bufio.write(b"abc")
1119 del bufio
1120 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001121 self.assertEqual(b"abc", writer._write_stack[0])
Antoine Pitrou19690592009-06-12 20:14:08 +00001122
1123 def test_truncate(self):
1124 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001125 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001126 bufio = self.tp(raw, 8)
1127 bufio.write(b"abcdef")
1128 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001129 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001130 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001131 self.assertEqual(f.read(), b"abc")
1132
Victor Stinner6a102812010-04-27 23:55:59 +00001133 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001134 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001135 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001136 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001137 # Write out many bytes from many threads and test they were
1138 # all flushed.
1139 N = 1000
1140 contents = bytes(range(256)) * N
1141 sizes = cycle([1, 19])
1142 n = 0
1143 queue = deque()
1144 while n < len(contents):
1145 size = next(sizes)
1146 queue.append(contents[n:n+size])
1147 n += size
1148 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001149 # We use a real file object because it allows us to
1150 # exercise situations where the GIL is released before
1151 # writing the buffer to the raw streams. This is in addition
1152 # to concurrency issues due to switching threads in the middle
1153 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001154 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001155 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001156 errors = []
1157 def f():
1158 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001159 while True:
1160 try:
1161 s = queue.popleft()
1162 except IndexError:
1163 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001164 bufio.write(s)
1165 except Exception as e:
1166 errors.append(e)
1167 raise
1168 threads = [threading.Thread(target=f) for x in range(20)]
1169 for t in threads:
1170 t.start()
1171 time.sleep(0.02) # yield
1172 for t in threads:
1173 t.join()
1174 self.assertFalse(errors,
1175 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001176 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001177 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001178 s = f.read()
1179 for i in range(256):
Ezio Melotti2623a372010-11-21 13:34:58 +00001180 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001181 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001182 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001183
Antoine Pitrou19690592009-06-12 20:14:08 +00001184 def test_misbehaved_io(self):
1185 rawio = self.MisbehavedRawIO()
1186 bufio = self.tp(rawio, 5)
1187 self.assertRaises(IOError, bufio.seek, 0)
1188 self.assertRaises(IOError, bufio.tell)
1189 self.assertRaises(IOError, bufio.write, b"abcdef")
1190
1191 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001192 with support.check_warnings(("max_buffer_size is deprecated",
1193 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001194 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001195
1196
1197class CBufferedWriterTest(BufferedWriterTest):
1198 tp = io.BufferedWriter
1199
1200 def test_constructor(self):
1201 BufferedWriterTest.test_constructor(self)
1202 # The allocation can succeed on 32-bit builds, e.g. with more
1203 # than 2GB RAM and a 64-bit kernel.
1204 if sys.maxsize > 0x7FFFFFFF:
1205 rawio = self.MockRawIO()
1206 bufio = self.tp(rawio)
1207 self.assertRaises((OverflowError, MemoryError, ValueError),
1208 bufio.__init__, rawio, sys.maxsize)
1209
1210 def test_initialization(self):
1211 rawio = self.MockRawIO()
1212 bufio = self.tp(rawio)
1213 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1214 self.assertRaises(ValueError, bufio.write, b"def")
1215 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1216 self.assertRaises(ValueError, bufio.write, b"def")
1217 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1218 self.assertRaises(ValueError, bufio.write, b"def")
1219
1220 def test_garbage_collection(self):
1221 # C BufferedWriter objects are collected, and collecting them flushes
1222 # all data to disk.
1223 # The Python version has __del__, so it ends into gc.garbage instead
1224 rawio = self.FileIO(support.TESTFN, "w+b")
1225 f = self.tp(rawio)
1226 f.write(b"123xxx")
1227 f.x = f
1228 wr = weakref.ref(f)
1229 del f
1230 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001231 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001232 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001233 self.assertEqual(f.read(), b"123xxx")
1234
1235
1236class PyBufferedWriterTest(BufferedWriterTest):
1237 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001238
1239class BufferedRWPairTest(unittest.TestCase):
1240
Antoine Pitrou19690592009-06-12 20:14:08 +00001241 def test_constructor(self):
1242 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001243 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001244
Antoine Pitrou19690592009-06-12 20:14:08 +00001245 def test_detach(self):
1246 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1247 self.assertRaises(self.UnsupportedOperation, pair.detach)
1248
1249 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001250 with support.check_warnings(("max_buffer_size is deprecated",
1251 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001252 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001253
1254 def test_constructor_with_not_readable(self):
1255 class NotReadable(MockRawIO):
1256 def readable(self):
1257 return False
1258
1259 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1260
1261 def test_constructor_with_not_writeable(self):
1262 class NotWriteable(MockRawIO):
1263 def writable(self):
1264 return False
1265
1266 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1267
1268 def test_read(self):
1269 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1270
1271 self.assertEqual(pair.read(3), b"abc")
1272 self.assertEqual(pair.read(1), b"d")
1273 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001274 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1275 self.assertEqual(pair.read(None), b"abc")
1276
1277 def test_readlines(self):
1278 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1279 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1280 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1281 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001282
1283 def test_read1(self):
1284 # .read1() is delegated to the underlying reader object, so this test
1285 # can be shallow.
1286 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1287
1288 self.assertEqual(pair.read1(3), b"abc")
1289
1290 def test_readinto(self):
1291 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1292
1293 data = bytearray(5)
1294 self.assertEqual(pair.readinto(data), 5)
1295 self.assertEqual(data, b"abcde")
1296
1297 def test_write(self):
1298 w = self.MockRawIO()
1299 pair = self.tp(self.MockRawIO(), w)
1300
1301 pair.write(b"abc")
1302 pair.flush()
1303 pair.write(b"def")
1304 pair.flush()
1305 self.assertEqual(w._write_stack, [b"abc", b"def"])
1306
1307 def test_peek(self):
1308 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1309
1310 self.assertTrue(pair.peek(3).startswith(b"abc"))
1311 self.assertEqual(pair.read(3), b"abc")
1312
1313 def test_readable(self):
1314 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1315 self.assertTrue(pair.readable())
1316
1317 def test_writeable(self):
1318 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1319 self.assertTrue(pair.writable())
1320
1321 def test_seekable(self):
1322 # BufferedRWPairs are never seekable, even if their readers and writers
1323 # are.
1324 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1325 self.assertFalse(pair.seekable())
1326
1327 # .flush() is delegated to the underlying writer object and has been
1328 # tested in the test_write method.
1329
1330 def test_close_and_closed(self):
1331 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1332 self.assertFalse(pair.closed)
1333 pair.close()
1334 self.assertTrue(pair.closed)
1335
1336 def test_isatty(self):
1337 class SelectableIsAtty(MockRawIO):
1338 def __init__(self, isatty):
1339 MockRawIO.__init__(self)
1340 self._isatty = isatty
1341
1342 def isatty(self):
1343 return self._isatty
1344
1345 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1346 self.assertFalse(pair.isatty())
1347
1348 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1349 self.assertTrue(pair.isatty())
1350
1351 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1352 self.assertTrue(pair.isatty())
1353
1354 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1355 self.assertTrue(pair.isatty())
1356
1357class CBufferedRWPairTest(BufferedRWPairTest):
1358 tp = io.BufferedRWPair
1359
1360class PyBufferedRWPairTest(BufferedRWPairTest):
1361 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001362
1363
Antoine Pitrou19690592009-06-12 20:14:08 +00001364class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1365 read_mode = "rb+"
1366 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001367
Antoine Pitrou19690592009-06-12 20:14:08 +00001368 def test_constructor(self):
1369 BufferedReaderTest.test_constructor(self)
1370 BufferedWriterTest.test_constructor(self)
1371
1372 def test_read_and_write(self):
1373 raw = self.MockRawIO((b"asdf", b"ghjk"))
1374 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001375
1376 self.assertEqual(b"as", rw.read(2))
1377 rw.write(b"ddd")
1378 rw.write(b"eee")
1379 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001380 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001381 self.assertEqual(b"dddeee", raw._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001382
Antoine Pitrou19690592009-06-12 20:14:08 +00001383 def test_seek_and_tell(self):
1384 raw = self.BytesIO(b"asdfghjkl")
1385 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001386
Ezio Melotti2623a372010-11-21 13:34:58 +00001387 self.assertEqual(b"as", rw.read(2))
1388 self.assertEqual(2, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001389 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001390 self.assertEqual(b"asdf", rw.read(4))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001391
Antoine Pitrou808cec52011-08-20 15:40:58 +02001392 rw.write(b"123f")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001393 rw.seek(0, 0)
Antoine Pitrou808cec52011-08-20 15:40:58 +02001394 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001395 self.assertEqual(9, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001396 rw.seek(-4, 2)
Ezio Melotti2623a372010-11-21 13:34:58 +00001397 self.assertEqual(5, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001398 rw.seek(2, 1)
Ezio Melotti2623a372010-11-21 13:34:58 +00001399 self.assertEqual(7, rw.tell())
1400 self.assertEqual(b"fl", rw.read(11))
Antoine Pitrou808cec52011-08-20 15:40:58 +02001401 rw.flush()
1402 self.assertEqual(b"asdf123fl", raw.getvalue())
1403
Christian Heimes1a6387e2008-03-26 12:49:49 +00001404 self.assertRaises(TypeError, rw.seek, 0.0)
1405
Antoine Pitrou19690592009-06-12 20:14:08 +00001406 def check_flush_and_read(self, read_func):
1407 raw = self.BytesIO(b"abcdefghi")
1408 bufio = self.tp(raw)
1409
Ezio Melotti2623a372010-11-21 13:34:58 +00001410 self.assertEqual(b"ab", read_func(bufio, 2))
Antoine Pitrou19690592009-06-12 20:14:08 +00001411 bufio.write(b"12")
Ezio Melotti2623a372010-11-21 13:34:58 +00001412 self.assertEqual(b"ef", read_func(bufio, 2))
1413 self.assertEqual(6, bufio.tell())
Antoine Pitrou19690592009-06-12 20:14:08 +00001414 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001415 self.assertEqual(6, bufio.tell())
1416 self.assertEqual(b"ghi", read_func(bufio))
Antoine Pitrou19690592009-06-12 20:14:08 +00001417 raw.seek(0, 0)
1418 raw.write(b"XYZ")
1419 # flush() resets the read buffer
1420 bufio.flush()
1421 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001422 self.assertEqual(b"XYZ", read_func(bufio, 3))
Antoine Pitrou19690592009-06-12 20:14:08 +00001423
1424 def test_flush_and_read(self):
1425 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1426
1427 def test_flush_and_readinto(self):
1428 def _readinto(bufio, n=-1):
1429 b = bytearray(n if n >= 0 else 9999)
1430 n = bufio.readinto(b)
1431 return bytes(b[:n])
1432 self.check_flush_and_read(_readinto)
1433
1434 def test_flush_and_peek(self):
1435 def _peek(bufio, n=-1):
1436 # This relies on the fact that the buffer can contain the whole
1437 # raw stream, otherwise peek() can return less.
1438 b = bufio.peek(n)
1439 if n != -1:
1440 b = b[:n]
1441 bufio.seek(len(b), 1)
1442 return b
1443 self.check_flush_and_read(_peek)
1444
1445 def test_flush_and_write(self):
1446 raw = self.BytesIO(b"abcdefghi")
1447 bufio = self.tp(raw)
1448
1449 bufio.write(b"123")
1450 bufio.flush()
1451 bufio.write(b"45")
1452 bufio.flush()
1453 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001454 self.assertEqual(b"12345fghi", raw.getvalue())
1455 self.assertEqual(b"12345fghi", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +00001456
1457 def test_threads(self):
1458 BufferedReaderTest.test_threads(self)
1459 BufferedWriterTest.test_threads(self)
1460
1461 def test_writes_and_peek(self):
1462 def _peek(bufio):
1463 bufio.peek(1)
1464 self.check_writes(_peek)
1465 def _peek(bufio):
1466 pos = bufio.tell()
1467 bufio.seek(-1, 1)
1468 bufio.peek(1)
1469 bufio.seek(pos, 0)
1470 self.check_writes(_peek)
1471
1472 def test_writes_and_reads(self):
1473 def _read(bufio):
1474 bufio.seek(-1, 1)
1475 bufio.read(1)
1476 self.check_writes(_read)
1477
1478 def test_writes_and_read1s(self):
1479 def _read1(bufio):
1480 bufio.seek(-1, 1)
1481 bufio.read1(1)
1482 self.check_writes(_read1)
1483
1484 def test_writes_and_readintos(self):
1485 def _read(bufio):
1486 bufio.seek(-1, 1)
1487 bufio.readinto(bytearray(1))
1488 self.check_writes(_read)
1489
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001490 def test_write_after_readahead(self):
1491 # Issue #6629: writing after the buffer was filled by readahead should
1492 # first rewind the raw stream.
1493 for overwrite_size in [1, 5]:
1494 raw = self.BytesIO(b"A" * 10)
1495 bufio = self.tp(raw, 4)
1496 # Trigger readahead
1497 self.assertEqual(bufio.read(1), b"A")
1498 self.assertEqual(bufio.tell(), 1)
1499 # Overwriting should rewind the raw stream if it needs so
1500 bufio.write(b"B" * overwrite_size)
1501 self.assertEqual(bufio.tell(), overwrite_size + 1)
1502 # If the write size was smaller than the buffer size, flush() and
1503 # check that rewind happens.
1504 bufio.flush()
1505 self.assertEqual(bufio.tell(), overwrite_size + 1)
1506 s = raw.getvalue()
1507 self.assertEqual(s,
1508 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1509
Antoine Pitrouee46a7b2011-05-13 00:31:52 +02001510 def test_write_rewind_write(self):
1511 # Various combinations of reading / writing / seeking backwards / writing again
1512 def mutate(bufio, pos1, pos2):
1513 assert pos2 >= pos1
1514 # Fill the buffer
1515 bufio.seek(pos1)
1516 bufio.read(pos2 - pos1)
1517 bufio.write(b'\x02')
1518 # This writes earlier than the previous write, but still inside
1519 # the buffer.
1520 bufio.seek(pos1)
1521 bufio.write(b'\x01')
1522
1523 b = b"\x80\x81\x82\x83\x84"
1524 for i in range(0, len(b)):
1525 for j in range(i, len(b)):
1526 raw = self.BytesIO(b)
1527 bufio = self.tp(raw, 100)
1528 mutate(bufio, i, j)
1529 bufio.flush()
1530 expected = bytearray(b)
1531 expected[j] = 2
1532 expected[i] = 1
1533 self.assertEqual(raw.getvalue(), expected,
1534 "failed result for i=%d, j=%d" % (i, j))
1535
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001536 def test_truncate_after_read_or_write(self):
1537 raw = self.BytesIO(b"A" * 10)
1538 bufio = self.tp(raw, 100)
1539 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1540 self.assertEqual(bufio.truncate(), 2)
1541 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1542 self.assertEqual(bufio.truncate(), 4)
1543
Antoine Pitrou19690592009-06-12 20:14:08 +00001544 def test_misbehaved_io(self):
1545 BufferedReaderTest.test_misbehaved_io(self)
1546 BufferedWriterTest.test_misbehaved_io(self)
1547
Antoine Pitrou808cec52011-08-20 15:40:58 +02001548 def test_interleaved_read_write(self):
1549 # Test for issue #12213
1550 with self.BytesIO(b'abcdefgh') as raw:
1551 with self.tp(raw, 100) as f:
1552 f.write(b"1")
1553 self.assertEqual(f.read(1), b'b')
1554 f.write(b'2')
1555 self.assertEqual(f.read1(1), b'd')
1556 f.write(b'3')
1557 buf = bytearray(1)
1558 f.readinto(buf)
1559 self.assertEqual(buf, b'f')
1560 f.write(b'4')
1561 self.assertEqual(f.peek(1), b'h')
1562 f.flush()
1563 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1564
1565 with self.BytesIO(b'abc') as raw:
1566 with self.tp(raw, 100) as f:
1567 self.assertEqual(f.read(1), b'a')
1568 f.write(b"2")
1569 self.assertEqual(f.read(1), b'c')
1570 f.flush()
1571 self.assertEqual(raw.getvalue(), b'a2c')
1572
1573 def test_interleaved_readline_write(self):
1574 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1575 with self.tp(raw) as f:
1576 f.write(b'1')
1577 self.assertEqual(f.readline(), b'b\n')
1578 f.write(b'2')
1579 self.assertEqual(f.readline(), b'def\n')
1580 f.write(b'3')
1581 self.assertEqual(f.readline(), b'\n')
1582 f.flush()
1583 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1584
1585
Antoine Pitrou19690592009-06-12 20:14:08 +00001586class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest):
1587 tp = io.BufferedRandom
1588
1589 def test_constructor(self):
1590 BufferedRandomTest.test_constructor(self)
1591 # The allocation can succeed on 32-bit builds, e.g. with more
1592 # than 2GB RAM and a 64-bit kernel.
1593 if sys.maxsize > 0x7FFFFFFF:
1594 rawio = self.MockRawIO()
1595 bufio = self.tp(rawio)
1596 self.assertRaises((OverflowError, MemoryError, ValueError),
1597 bufio.__init__, rawio, sys.maxsize)
1598
1599 def test_garbage_collection(self):
1600 CBufferedReaderTest.test_garbage_collection(self)
1601 CBufferedWriterTest.test_garbage_collection(self)
1602
1603class PyBufferedRandomTest(BufferedRandomTest):
1604 tp = pyio.BufferedRandom
1605
1606
Christian Heimes1a6387e2008-03-26 12:49:49 +00001607# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1608# properties:
1609# - A single output character can correspond to many bytes of input.
1610# - The number of input bytes to complete the character can be
1611# undetermined until the last input byte is received.
1612# - The number of input bytes can vary depending on previous input.
1613# - A single input byte can correspond to many characters of output.
1614# - The number of output characters can be undetermined until the
1615# last input byte is received.
1616# - The number of output characters can vary depending on previous input.
1617
1618class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1619 """
1620 For testing seek/tell behavior with a stateful, buffering decoder.
1621
1622 Input is a sequence of words. Words may be fixed-length (length set
1623 by input) or variable-length (period-terminated). In variable-length
1624 mode, extra periods are ignored. Possible words are:
1625 - 'i' followed by a number sets the input length, I (maximum 99).
1626 When I is set to 0, words are space-terminated.
1627 - 'o' followed by a number sets the output length, O (maximum 99).
1628 - Any other word is converted into a word followed by a period on
1629 the output. The output word consists of the input word truncated
1630 or padded out with hyphens to make its length equal to O. If O
1631 is 0, the word is output verbatim without truncating or padding.
1632 I and O are initially set to 1. When I changes, any buffered input is
1633 re-scanned according to the new I. EOF also terminates the last word.
1634 """
1635
1636 def __init__(self, errors='strict'):
1637 codecs.IncrementalDecoder.__init__(self, errors)
1638 self.reset()
1639
1640 def __repr__(self):
1641 return '<SID %x>' % id(self)
1642
1643 def reset(self):
1644 self.i = 1
1645 self.o = 1
1646 self.buffer = bytearray()
1647
1648 def getstate(self):
1649 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1650 return bytes(self.buffer), i*100 + o
1651
1652 def setstate(self, state):
1653 buffer, io = state
1654 self.buffer = bytearray(buffer)
1655 i, o = divmod(io, 100)
1656 self.i, self.o = i ^ 1, o ^ 1
1657
1658 def decode(self, input, final=False):
1659 output = ''
1660 for b in input:
1661 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001662 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001663 if self.buffer:
1664 output += self.process_word()
1665 else:
1666 self.buffer.append(b)
1667 else: # fixed-length, terminate after self.i bytes
1668 self.buffer.append(b)
1669 if len(self.buffer) == self.i:
1670 output += self.process_word()
1671 if final and self.buffer: # EOF terminates the last word
1672 output += self.process_word()
1673 return output
1674
1675 def process_word(self):
1676 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001677 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001678 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001679 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001680 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1681 else:
1682 output = self.buffer.decode('ascii')
1683 if len(output) < self.o:
1684 output += '-'*self.o # pad out with hyphens
1685 if self.o:
1686 output = output[:self.o] # truncate to output length
1687 output += '.'
1688 self.buffer = bytearray()
1689 return output
1690
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001691 codecEnabled = False
1692
1693 @classmethod
1694 def lookupTestDecoder(cls, name):
1695 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001696 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001697 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001698 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001699 incrementalencoder=None,
1700 streamreader=None, streamwriter=None,
1701 incrementaldecoder=cls)
1702
1703# Register the previous decoder for testing.
1704# Disabled by default, tests will enable it.
1705codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1706
1707
Christian Heimes1a6387e2008-03-26 12:49:49 +00001708class StatefulIncrementalDecoderTest(unittest.TestCase):
1709 """
1710 Make sure the StatefulIncrementalDecoder actually works.
1711 """
1712
1713 test_cases = [
1714 # I=1, O=1 (fixed-length input == fixed-length output)
1715 (b'abcd', False, 'a.b.c.d.'),
1716 # I=0, O=0 (variable-length input, variable-length output)
1717 (b'oiabcd', True, 'abcd.'),
1718 # I=0, O=0 (should ignore extra periods)
1719 (b'oi...abcd...', True, 'abcd.'),
1720 # I=0, O=6 (variable-length input, fixed-length output)
1721 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1722 # I=2, O=6 (fixed-length input < fixed-length output)
1723 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1724 # I=6, O=3 (fixed-length input > fixed-length output)
1725 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1726 # I=0, then 3; O=29, then 15 (with longer output)
1727 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1728 'a----------------------------.' +
1729 'b----------------------------.' +
1730 'cde--------------------------.' +
1731 'abcdefghijabcde.' +
1732 'a.b------------.' +
1733 '.c.------------.' +
1734 'd.e------------.' +
1735 'k--------------.' +
1736 'l--------------.' +
1737 'm--------------.')
1738 ]
1739
Antoine Pitrou19690592009-06-12 20:14:08 +00001740 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001741 # Try a few one-shot test cases.
1742 for input, eof, output in self.test_cases:
1743 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001744 self.assertEqual(d.decode(input, eof), output)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001745
1746 # Also test an unfinished decode, followed by forcing EOF.
1747 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001748 self.assertEqual(d.decode(b'oiabcd'), '')
1749 self.assertEqual(d.decode(b'', 1), 'abcd.')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001750
1751class TextIOWrapperTest(unittest.TestCase):
1752
1753 def setUp(self):
1754 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1755 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001756 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001757
1758 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001759 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001760
Antoine Pitrou19690592009-06-12 20:14:08 +00001761 def test_constructor(self):
1762 r = self.BytesIO(b"\xc3\xa9\n\n")
1763 b = self.BufferedReader(r, 1000)
1764 t = self.TextIOWrapper(b)
1765 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti2623a372010-11-21 13:34:58 +00001766 self.assertEqual(t.encoding, "latin1")
1767 self.assertEqual(t.line_buffering, False)
Antoine Pitrou19690592009-06-12 20:14:08 +00001768 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti2623a372010-11-21 13:34:58 +00001769 self.assertEqual(t.encoding, "utf8")
1770 self.assertEqual(t.line_buffering, True)
1771 self.assertEqual("\xe9\n", t.readline())
Antoine Pitrou19690592009-06-12 20:14:08 +00001772 self.assertRaises(TypeError, t.__init__, b, newline=42)
1773 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1774
1775 def test_detach(self):
1776 r = self.BytesIO()
1777 b = self.BufferedWriter(r)
1778 t = self.TextIOWrapper(b)
1779 self.assertIs(t.detach(), b)
1780
1781 t = self.TextIOWrapper(b, encoding="ascii")
1782 t.write("howdy")
1783 self.assertFalse(r.getvalue())
1784 t.detach()
1785 self.assertEqual(r.getvalue(), b"howdy")
1786 self.assertRaises(ValueError, t.detach)
1787
1788 def test_repr(self):
1789 raw = self.BytesIO("hello".encode("utf-8"))
1790 b = self.BufferedReader(raw)
1791 t = self.TextIOWrapper(b, encoding="utf-8")
1792 modname = self.TextIOWrapper.__module__
1793 self.assertEqual(repr(t),
1794 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1795 raw.name = "dummy"
1796 self.assertEqual(repr(t),
1797 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1798 raw.name = b"dummy"
1799 self.assertEqual(repr(t),
1800 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1801
1802 def test_line_buffering(self):
1803 r = self.BytesIO()
1804 b = self.BufferedWriter(r, 1000)
1805 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1806 t.write("X")
Ezio Melotti2623a372010-11-21 13:34:58 +00001807 self.assertEqual(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001808 t.write("Y\nZ")
Ezio Melotti2623a372010-11-21 13:34:58 +00001809 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001810 t.write("A\rB")
Ezio Melotti2623a372010-11-21 13:34:58 +00001811 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001812
Antoine Pitrou19690592009-06-12 20:14:08 +00001813 def test_encoding(self):
1814 # Check the encoding attribute is always set, and valid
1815 b = self.BytesIO()
1816 t = self.TextIOWrapper(b, encoding="utf8")
1817 self.assertEqual(t.encoding, "utf8")
1818 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001819 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00001820 codecs.lookup(t.encoding)
1821
1822 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001823 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001824 b = self.BytesIO(b"abc\n\xff\n")
1825 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001826 self.assertRaises(UnicodeError, t.read)
1827 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001828 b = self.BytesIO(b"abc\n\xff\n")
1829 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001830 self.assertRaises(UnicodeError, t.read)
1831 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001832 b = self.BytesIO(b"abc\n\xff\n")
1833 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti2623a372010-11-21 13:34:58 +00001834 self.assertEqual(t.read(), "abc\n\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001835 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001836 b = self.BytesIO(b"abc\n\xff\n")
1837 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti2623a372010-11-21 13:34:58 +00001838 self.assertEqual(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001839
Antoine Pitrou19690592009-06-12 20:14:08 +00001840 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001841 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001842 b = self.BytesIO()
1843 t = self.TextIOWrapper(b, encoding="ascii")
1844 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001845 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001846 b = self.BytesIO()
1847 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1848 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001849 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001850 b = self.BytesIO()
1851 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001852 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001853 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001854 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001855 self.assertEqual(b.getvalue(), b"abcdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001856 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001857 b = self.BytesIO()
1858 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001859 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001860 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001861 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001862 self.assertEqual(b.getvalue(), b"abc?def\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001863
Antoine Pitrou19690592009-06-12 20:14:08 +00001864 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001865 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1866
1867 tests = [
1868 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1869 [ '', input_lines ],
1870 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1871 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1872 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1873 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001874 encodings = (
1875 'utf-8', 'latin-1',
1876 'utf-16', 'utf-16-le', 'utf-16-be',
1877 'utf-32', 'utf-32-le', 'utf-32-be',
1878 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00001879
1880 # Try a range of buffer sizes to test the case where \r is the last
1881 # character in TextIOWrapper._pending_line.
1882 for encoding in encodings:
1883 # XXX: str.encode() should return bytes
1884 data = bytes(''.join(input_lines).encode(encoding))
1885 for do_reads in (False, True):
1886 for bufsize in range(1, 10):
1887 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001888 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1889 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00001890 encoding=encoding)
1891 if do_reads:
1892 got_lines = []
1893 while True:
1894 c2 = textio.read(2)
1895 if c2 == '':
1896 break
Ezio Melotti2623a372010-11-21 13:34:58 +00001897 self.assertEqual(len(c2), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001898 got_lines.append(c2 + textio.readline())
1899 else:
1900 got_lines = list(textio)
1901
1902 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti2623a372010-11-21 13:34:58 +00001903 self.assertEqual(got_line, exp_line)
1904 self.assertEqual(len(got_lines), len(exp_lines))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001905
Antoine Pitrou19690592009-06-12 20:14:08 +00001906 def test_newlines_input(self):
1907 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001908 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1909 for newline, expected in [
1910 (None, normalized.decode("ascii").splitlines(True)),
1911 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00001912 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1913 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1914 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00001915 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00001916 buf = self.BytesIO(testdata)
1917 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti2623a372010-11-21 13:34:58 +00001918 self.assertEqual(txt.readlines(), expected)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001919 txt.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001920 self.assertEqual(txt.read(), "".join(expected))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001921
Antoine Pitrou19690592009-06-12 20:14:08 +00001922 def test_newlines_output(self):
1923 testdict = {
1924 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1925 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1926 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1927 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1928 }
1929 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1930 for newline, expected in tests:
1931 buf = self.BytesIO()
1932 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1933 txt.write("AAA\nB")
1934 txt.write("BB\nCCC\n")
1935 txt.write("X\rY\r\nZ")
1936 txt.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001937 self.assertEqual(buf.closed, False)
1938 self.assertEqual(buf.getvalue(), expected)
Antoine Pitrou19690592009-06-12 20:14:08 +00001939
1940 def test_destructor(self):
1941 l = []
1942 base = self.BytesIO
1943 class MyBytesIO(base):
1944 def close(self):
1945 l.append(self.getvalue())
1946 base.close(self)
1947 b = MyBytesIO()
1948 t = self.TextIOWrapper(b, encoding="ascii")
1949 t.write("abc")
1950 del t
1951 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001952 self.assertEqual([b"abc"], l)
Antoine Pitrou19690592009-06-12 20:14:08 +00001953
1954 def test_override_destructor(self):
1955 record = []
1956 class MyTextIO(self.TextIOWrapper):
1957 def __del__(self):
1958 record.append(1)
1959 try:
1960 f = super(MyTextIO, self).__del__
1961 except AttributeError:
1962 pass
1963 else:
1964 f()
1965 def close(self):
1966 record.append(2)
1967 super(MyTextIO, self).close()
1968 def flush(self):
1969 record.append(3)
1970 super(MyTextIO, self).flush()
1971 b = self.BytesIO()
1972 t = MyTextIO(b, encoding="ascii")
1973 del t
1974 support.gc_collect()
1975 self.assertEqual(record, [1, 2, 3])
1976
1977 def test_error_through_destructor(self):
1978 # Test that the exception state is not modified by a destructor,
1979 # even if close() fails.
1980 rawio = self.CloseFailureIO()
1981 def f():
1982 self.TextIOWrapper(rawio).xyzzy
1983 with support.captured_output("stderr") as s:
1984 self.assertRaises(AttributeError, f)
1985 s = s.getvalue().strip()
1986 if s:
1987 # The destructor *may* have printed an unraisable error, check it
1988 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001989 self.assertTrue(s.startswith("Exception IOError: "), s)
1990 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001991
1992 # Systematic tests of the text I/O API
1993
Antoine Pitrou19690592009-06-12 20:14:08 +00001994 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001995 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1996 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00001997 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001998 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00001999 self.assertEqual(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002000 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002001 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002002 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002003 self.assertEqual(f.tell(), 0)
2004 self.assertEqual(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002005 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002006 self.assertEqual(f.seek(0), 0)
2007 self.assertEqual(f.read(None), "abc")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002008 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002009 self.assertEqual(f.read(2), "ab")
2010 self.assertEqual(f.read(1), "c")
2011 self.assertEqual(f.read(1), "")
2012 self.assertEqual(f.read(), "")
2013 self.assertEqual(f.tell(), cookie)
2014 self.assertEqual(f.seek(0), 0)
2015 self.assertEqual(f.seek(0, 2), cookie)
2016 self.assertEqual(f.write("def"), 3)
2017 self.assertEqual(f.seek(cookie), cookie)
2018 self.assertEqual(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002019 if enc.startswith("utf"):
2020 self.multi_line_test(f, enc)
2021 f.close()
2022
2023 def multi_line_test(self, f, enc):
2024 f.seek(0)
2025 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00002026 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002027 wlines = []
2028 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2029 chars = []
2030 for i in range(size):
2031 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00002032 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002033 wlines.append((f.tell(), line))
2034 f.write(line)
2035 f.seek(0)
2036 rlines = []
2037 while True:
2038 pos = f.tell()
2039 line = f.readline()
2040 if not line:
2041 break
2042 rlines.append((pos, line))
Ezio Melotti2623a372010-11-21 13:34:58 +00002043 self.assertEqual(rlines, wlines)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002044
Antoine Pitrou19690592009-06-12 20:14:08 +00002045 def test_telling(self):
2046 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002047 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002048 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002049 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002050 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002051 p2 = f.tell()
2052 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002053 self.assertEqual(f.tell(), p0)
2054 self.assertEqual(f.readline(), "\xff\n")
2055 self.assertEqual(f.tell(), p1)
2056 self.assertEqual(f.readline(), "\xff\n")
2057 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002058 f.seek(0)
2059 for line in f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002060 self.assertEqual(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002061 self.assertRaises(IOError, f.tell)
Ezio Melotti2623a372010-11-21 13:34:58 +00002062 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002063 f.close()
2064
Antoine Pitrou19690592009-06-12 20:14:08 +00002065 def test_seeking(self):
2066 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00002067 prefix_size = chunk_size - 2
2068 u_prefix = "a" * prefix_size
2069 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti2623a372010-11-21 13:34:58 +00002070 self.assertEqual(len(u_prefix), len(prefix))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002071 u_suffix = "\u8888\n"
2072 suffix = bytes(u_suffix.encode("utf-8"))
2073 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00002074 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002075 f.write(line*2)
2076 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002077 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002078 s = f.read(prefix_size)
Ezio Melotti2623a372010-11-21 13:34:58 +00002079 self.assertEqual(s, prefix.decode("ascii"))
2080 self.assertEqual(f.tell(), prefix_size)
2081 self.assertEqual(f.readline(), u_suffix)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002082
Antoine Pitrou19690592009-06-12 20:14:08 +00002083 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002084 # Regression test for a specific bug
2085 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00002086 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002087 f.write(data)
2088 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002089 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002090 f._CHUNK_SIZE # Just test that it exists
2091 f._CHUNK_SIZE = 2
2092 f.readline()
2093 f.tell()
2094
Antoine Pitrou19690592009-06-12 20:14:08 +00002095 def test_seek_and_tell(self):
2096 #Test seek/tell using the StatefulIncrementalDecoder.
2097 # Make test faster by doing smaller seeks
2098 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00002099
Antoine Pitrou19690592009-06-12 20:14:08 +00002100 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002101 """Tell/seek to various points within a data stream and ensure
2102 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00002103 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002104 f.write(data)
2105 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002106 f = self.open(support.TESTFN, encoding='test_decoder')
2107 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00002108 decoded = f.read()
2109 f.close()
2110
2111 for i in range(min_pos, len(decoded) + 1): # seek positions
2112 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00002113 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti2623a372010-11-21 13:34:58 +00002114 self.assertEqual(f.read(i), decoded[:i])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002115 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002116 self.assertEqual(f.read(j), decoded[i:i + j])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002117 f.seek(cookie)
Ezio Melotti2623a372010-11-21 13:34:58 +00002118 self.assertEqual(f.read(), decoded[i:])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002119 f.close()
2120
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002121 # Enable the test decoder.
2122 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002123
2124 # Run the tests.
2125 try:
2126 # Try each test case.
2127 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002128 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002129
2130 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002131 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2132 offset = CHUNK_SIZE - len(input)//2
2133 prefix = b'.'*offset
2134 # Don't bother seeking into the prefix (takes too long).
2135 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002136 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002137
2138 # Ensure our test decoder won't interfere with subsequent tests.
2139 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002140 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002141
Antoine Pitrou19690592009-06-12 20:14:08 +00002142 def test_encoded_writes(self):
2143 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002144 tests = ("utf-16",
2145 "utf-16-le",
2146 "utf-16-be",
2147 "utf-32",
2148 "utf-32-le",
2149 "utf-32-be")
2150 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002151 buf = self.BytesIO()
2152 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002153 # Check if the BOM is written only once (see issue1753).
2154 f.write(data)
2155 f.write(data)
2156 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002157 self.assertEqual(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002158 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002159 self.assertEqual(f.read(), data * 2)
2160 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002161
Antoine Pitrou19690592009-06-12 20:14:08 +00002162 def test_unreadable(self):
2163 class UnReadable(self.BytesIO):
2164 def readable(self):
2165 return False
2166 txt = self.TextIOWrapper(UnReadable())
2167 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002168
Antoine Pitrou19690592009-06-12 20:14:08 +00002169 def test_read_one_by_one(self):
2170 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002171 reads = ""
2172 while True:
2173 c = txt.read(1)
2174 if not c:
2175 break
2176 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002177 self.assertEqual(reads, "AA\nBB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002178
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002179 def test_readlines(self):
2180 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2181 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2182 txt.seek(0)
2183 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2184 txt.seek(0)
2185 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2186
Christian Heimes1a6387e2008-03-26 12:49:49 +00002187 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002188 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002189 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002190 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002191 reads = ""
2192 while True:
2193 c = txt.read(128)
2194 if not c:
2195 break
2196 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002197 self.assertEqual(reads, "A"*127+"\nB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002198
2199 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002200 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002201
2202 # read one char at a time
2203 reads = ""
2204 while True:
2205 c = txt.read(1)
2206 if not c:
2207 break
2208 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002209 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002210
2211 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002212 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002213 txt._CHUNK_SIZE = 4
2214
2215 reads = ""
2216 while True:
2217 c = txt.read(4)
2218 if not c:
2219 break
2220 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002221 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002222
2223 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002224 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002225 txt._CHUNK_SIZE = 4
2226
2227 reads = txt.read(4)
2228 reads += txt.read(4)
2229 reads += txt.readline()
2230 reads += txt.readline()
2231 reads += txt.readline()
Ezio Melotti2623a372010-11-21 13:34:58 +00002232 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002233
2234 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002235 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002236 txt._CHUNK_SIZE = 4
2237
2238 reads = txt.read(4)
2239 reads += txt.read()
Ezio Melotti2623a372010-11-21 13:34:58 +00002240 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002241
2242 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002243 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002244 txt._CHUNK_SIZE = 4
2245
2246 reads = txt.read(4)
2247 pos = txt.tell()
2248 txt.seek(0)
2249 txt.seek(pos)
Ezio Melotti2623a372010-11-21 13:34:58 +00002250 self.assertEqual(txt.read(4), "BBB\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002251
2252 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002253 buffer = self.BytesIO(self.testdata)
2254 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002255
2256 self.assertEqual(buffer.seekable(), txt.seekable())
2257
Antoine Pitrou19690592009-06-12 20:14:08 +00002258 def test_append_bom(self):
2259 # The BOM is not written again when appending to a non-empty file
2260 filename = support.TESTFN
2261 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2262 with self.open(filename, 'w', encoding=charset) as f:
2263 f.write('aaa')
2264 pos = f.tell()
2265 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002266 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002267
2268 with self.open(filename, 'a', encoding=charset) as f:
2269 f.write('xxx')
2270 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002271 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002272
Antoine Pitrou19690592009-06-12 20:14:08 +00002273 def test_seek_bom(self):
2274 # Same test, but when seeking manually
2275 filename = support.TESTFN
2276 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2277 with self.open(filename, 'w', encoding=charset) as f:
2278 f.write('aaa')
2279 pos = f.tell()
2280 with self.open(filename, 'r+', encoding=charset) as f:
2281 f.seek(pos)
2282 f.write('zzz')
2283 f.seek(0)
2284 f.write('bbb')
2285 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002286 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002287
2288 def test_errors_property(self):
2289 with self.open(support.TESTFN, "w") as f:
2290 self.assertEqual(f.errors, "strict")
2291 with self.open(support.TESTFN, "w", errors="replace") as f:
2292 self.assertEqual(f.errors, "replace")
2293
Victor Stinner6a102812010-04-27 23:55:59 +00002294 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002295 def test_threads_write(self):
2296 # Issue6750: concurrent writes could duplicate data
2297 event = threading.Event()
2298 with self.open(support.TESTFN, "w", buffering=1) as f:
2299 def run(n):
2300 text = "Thread%03d\n" % n
2301 event.wait()
2302 f.write(text)
2303 threads = [threading.Thread(target=lambda n=x: run(n))
2304 for x in range(20)]
2305 for t in threads:
2306 t.start()
2307 time.sleep(0.02)
2308 event.set()
2309 for t in threads:
2310 t.join()
2311 with self.open(support.TESTFN) as f:
2312 content = f.read()
2313 for n in range(20):
Ezio Melotti2623a372010-11-21 13:34:58 +00002314 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002315
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002316 def test_flush_error_on_close(self):
2317 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2318 def bad_flush():
2319 raise IOError()
2320 txt.flush = bad_flush
2321 self.assertRaises(IOError, txt.close) # exception not swallowed
2322
2323 def test_multi_close(self):
2324 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2325 txt.close()
2326 txt.close()
2327 txt.close()
2328 self.assertRaises(ValueError, txt.flush)
2329
Antoine Pitroufc9ead62010-12-21 21:26:55 +00002330 def test_readonly_attributes(self):
2331 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2332 buf = self.BytesIO(self.testdata)
2333 with self.assertRaises((AttributeError, TypeError)):
2334 txt.buffer = buf
2335
Antoine Pitrou19690592009-06-12 20:14:08 +00002336class CTextIOWrapperTest(TextIOWrapperTest):
2337
2338 def test_initialization(self):
2339 r = self.BytesIO(b"\xc3\xa9\n\n")
2340 b = self.BufferedReader(r, 1000)
2341 t = self.TextIOWrapper(b)
2342 self.assertRaises(TypeError, t.__init__, b, newline=42)
2343 self.assertRaises(ValueError, t.read)
2344 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2345 self.assertRaises(ValueError, t.read)
2346
2347 def test_garbage_collection(self):
2348 # C TextIOWrapper objects are collected, and collecting them flushes
2349 # all data to disk.
2350 # The Python version has __del__, so it ends in gc.garbage instead.
2351 rawio = io.FileIO(support.TESTFN, "wb")
2352 b = self.BufferedWriter(rawio)
2353 t = self.TextIOWrapper(b, encoding="ascii")
2354 t.write("456def")
2355 t.x = t
2356 wr = weakref.ref(t)
2357 del t
2358 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002359 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002360 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002361 self.assertEqual(f.read(), b"456def")
2362
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002363 def test_rwpair_cleared_before_textio(self):
2364 # Issue 13070: TextIOWrapper's finalization would crash when called
2365 # after the reference to the underlying BufferedRWPair's writer got
2366 # cleared by the GC.
2367 for i in range(1000):
2368 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2369 t1 = self.TextIOWrapper(b1, encoding="ascii")
2370 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2371 t2 = self.TextIOWrapper(b2, encoding="ascii")
2372 # circular references
2373 t1.buddy = t2
2374 t2.buddy = t1
2375 support.gc_collect()
2376
2377
Antoine Pitrou19690592009-06-12 20:14:08 +00002378class PyTextIOWrapperTest(TextIOWrapperTest):
2379 pass
2380
2381
2382class IncrementalNewlineDecoderTest(unittest.TestCase):
2383
2384 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002385 # UTF-8 specific tests for a newline decoder
2386 def _check_decode(b, s, **kwargs):
2387 # We exercise getstate() / setstate() as well as decode()
2388 state = decoder.getstate()
Ezio Melotti2623a372010-11-21 13:34:58 +00002389 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002390 decoder.setstate(state)
Ezio Melotti2623a372010-11-21 13:34:58 +00002391 self.assertEqual(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002392
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002393 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002394
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002395 _check_decode(b'\xe8', "")
2396 _check_decode(b'\xa2', "")
2397 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002398
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002399 _check_decode(b'\xe8', "")
2400 _check_decode(b'\xa2', "")
2401 _check_decode(b'\x88', "\u8888")
2402
2403 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002404 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2405
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002406 decoder.reset()
2407 _check_decode(b'\n', "\n")
2408 _check_decode(b'\r', "")
2409 _check_decode(b'', "\n", final=True)
2410 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002411
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002412 _check_decode(b'\r', "")
2413 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002414
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002415 _check_decode(b'\r\r\n', "\n\n")
2416 _check_decode(b'\r', "")
2417 _check_decode(b'\r', "\n")
2418 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002419
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002420 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2421 _check_decode(b'\xe8\xa2\x88', "\u8888")
2422 _check_decode(b'\n', "\n")
2423 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2424 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002425
Antoine Pitrou19690592009-06-12 20:14:08 +00002426 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002427 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002428 if encoding is not None:
2429 encoder = codecs.getincrementalencoder(encoding)()
2430 def _decode_bytewise(s):
2431 # Decode one byte at a time
2432 for b in encoder.encode(s):
2433 result.append(decoder.decode(b))
2434 else:
2435 encoder = None
2436 def _decode_bytewise(s):
2437 # Decode one char at a time
2438 for c in s:
2439 result.append(decoder.decode(c))
Ezio Melotti2623a372010-11-21 13:34:58 +00002440 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002441 _decode_bytewise("abc\n\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002442 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002443 _decode_bytewise("\nabc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002444 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002445 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002446 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002447 _decode_bytewise("abc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002448 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002449 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002450 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002451 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002452 input = "abc"
2453 if encoder is not None:
2454 encoder.reset()
2455 input = encoder.encode(input)
Ezio Melotti2623a372010-11-21 13:34:58 +00002456 self.assertEqual(decoder.decode(input), "abc")
2457 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002458
2459 def test_newline_decoder(self):
2460 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002461 # None meaning the IncrementalNewlineDecoder takes unicode input
2462 # rather than bytes input
2463 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002464 'utf-16', 'utf-16-le', 'utf-16-be',
2465 'utf-32', 'utf-32-le', 'utf-32-be',
2466 )
2467 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002468 decoder = enc and codecs.getincrementaldecoder(enc)()
2469 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2470 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002471 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002472 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2473 self.check_newline_decoding_utf8(decoder)
2474
2475 def test_newline_bytes(self):
2476 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2477 def _check(dec):
Ezio Melotti2623a372010-11-21 13:34:58 +00002478 self.assertEqual(dec.newlines, None)
2479 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2480 self.assertEqual(dec.newlines, None)
2481 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2482 self.assertEqual(dec.newlines, None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002483 dec = self.IncrementalNewlineDecoder(None, translate=False)
2484 _check(dec)
2485 dec = self.IncrementalNewlineDecoder(None, translate=True)
2486 _check(dec)
2487
2488class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2489 pass
2490
2491class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2492 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002493
Christian Heimes1a6387e2008-03-26 12:49:49 +00002494
2495# XXX Tests for open()
2496
2497class MiscIOTest(unittest.TestCase):
2498
Benjamin Petersonad100c32008-11-20 22:06:22 +00002499 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002500 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002501
Antoine Pitrou19690592009-06-12 20:14:08 +00002502 def test___all__(self):
2503 for name in self.io.__all__:
2504 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002505 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002506 if name == "open":
2507 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002508 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002509 self.assertTrue(issubclass(obj, Exception), name)
2510 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002511 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002512
Benjamin Petersonad100c32008-11-20 22:06:22 +00002513 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002514 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002515 self.assertEqual(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002516 f.close()
2517
Antoine Pitrou19690592009-06-12 20:14:08 +00002518 f = self.open(support.TESTFN, "U")
Ezio Melotti2623a372010-11-21 13:34:58 +00002519 self.assertEqual(f.name, support.TESTFN)
2520 self.assertEqual(f.buffer.name, support.TESTFN)
2521 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2522 self.assertEqual(f.mode, "U")
2523 self.assertEqual(f.buffer.mode, "rb")
2524 self.assertEqual(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002525 f.close()
2526
Antoine Pitrou19690592009-06-12 20:14:08 +00002527 f = self.open(support.TESTFN, "w+")
Ezio Melotti2623a372010-11-21 13:34:58 +00002528 self.assertEqual(f.mode, "w+")
2529 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2530 self.assertEqual(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002531
Antoine Pitrou19690592009-06-12 20:14:08 +00002532 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti2623a372010-11-21 13:34:58 +00002533 self.assertEqual(g.mode, "wb")
2534 self.assertEqual(g.raw.mode, "wb")
2535 self.assertEqual(g.name, f.fileno())
2536 self.assertEqual(g.raw.name, f.fileno())
Benjamin Petersonad100c32008-11-20 22:06:22 +00002537 f.close()
2538 g.close()
2539
Antoine Pitrou19690592009-06-12 20:14:08 +00002540 def test_io_after_close(self):
2541 for kwargs in [
2542 {"mode": "w"},
2543 {"mode": "wb"},
2544 {"mode": "w", "buffering": 1},
2545 {"mode": "w", "buffering": 2},
2546 {"mode": "wb", "buffering": 0},
2547 {"mode": "r"},
2548 {"mode": "rb"},
2549 {"mode": "r", "buffering": 1},
2550 {"mode": "r", "buffering": 2},
2551 {"mode": "rb", "buffering": 0},
2552 {"mode": "w+"},
2553 {"mode": "w+b"},
2554 {"mode": "w+", "buffering": 1},
2555 {"mode": "w+", "buffering": 2},
2556 {"mode": "w+b", "buffering": 0},
2557 ]:
2558 f = self.open(support.TESTFN, **kwargs)
2559 f.close()
2560 self.assertRaises(ValueError, f.flush)
2561 self.assertRaises(ValueError, f.fileno)
2562 self.assertRaises(ValueError, f.isatty)
2563 self.assertRaises(ValueError, f.__iter__)
2564 if hasattr(f, "peek"):
2565 self.assertRaises(ValueError, f.peek, 1)
2566 self.assertRaises(ValueError, f.read)
2567 if hasattr(f, "read1"):
2568 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinner5100a402011-05-25 22:15:36 +02002569 if hasattr(f, "readall"):
2570 self.assertRaises(ValueError, f.readall)
Antoine Pitrou19690592009-06-12 20:14:08 +00002571 if hasattr(f, "readinto"):
2572 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2573 self.assertRaises(ValueError, f.readline)
2574 self.assertRaises(ValueError, f.readlines)
2575 self.assertRaises(ValueError, f.seek, 0)
2576 self.assertRaises(ValueError, f.tell)
2577 self.assertRaises(ValueError, f.truncate)
2578 self.assertRaises(ValueError, f.write,
2579 b"" if "b" in kwargs['mode'] else "")
2580 self.assertRaises(ValueError, f.writelines, [])
2581 self.assertRaises(ValueError, next, f)
2582
2583 def test_blockingioerror(self):
2584 # Various BlockingIOError issues
2585 self.assertRaises(TypeError, self.BlockingIOError)
2586 self.assertRaises(TypeError, self.BlockingIOError, 1)
2587 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2588 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2589 b = self.BlockingIOError(1, "")
2590 self.assertEqual(b.characters_written, 0)
2591 class C(unicode):
2592 pass
2593 c = C("")
2594 b = self.BlockingIOError(1, c)
2595 c.b = b
2596 b.c = c
2597 wr = weakref.ref(c)
2598 del c, b
2599 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002600 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002601
2602 def test_abcs(self):
2603 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002604 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2605 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2606 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2607 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002608
2609 def _check_abc_inheritance(self, abcmodule):
2610 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002611 self.assertIsInstance(f, abcmodule.IOBase)
2612 self.assertIsInstance(f, abcmodule.RawIOBase)
2613 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2614 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002615 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002616 self.assertIsInstance(f, abcmodule.IOBase)
2617 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2618 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2619 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002620 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002621 self.assertIsInstance(f, abcmodule.IOBase)
2622 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2623 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2624 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002625
2626 def test_abc_inheritance(self):
2627 # Test implementations inherit from their respective ABCs
2628 self._check_abc_inheritance(self)
2629
2630 def test_abc_inheritance_official(self):
2631 # Test implementations inherit from the official ABCs of the
2632 # baseline "io" module.
2633 self._check_abc_inheritance(io)
2634
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01002635 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2636 def test_nonblock_pipe_write_bigbuf(self):
2637 self._test_nonblock_pipe_write(16*1024)
2638
2639 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2640 def test_nonblock_pipe_write_smallbuf(self):
2641 self._test_nonblock_pipe_write(1024)
2642
2643 def _set_non_blocking(self, fd):
2644 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2645 self.assertNotEqual(flags, -1)
2646 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2647 self.assertEqual(res, 0)
2648
2649 def _test_nonblock_pipe_write(self, bufsize):
2650 sent = []
2651 received = []
2652 r, w = os.pipe()
2653 self._set_non_blocking(r)
2654 self._set_non_blocking(w)
2655
2656 # To exercise all code paths in the C implementation we need
2657 # to play with buffer sizes. For instance, if we choose a
2658 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2659 # then we will never get a partial write of the buffer.
2660 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2661 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2662
2663 with rf, wf:
2664 for N in 9999, 73, 7574:
2665 try:
2666 i = 0
2667 while True:
2668 msg = bytes([i % 26 + 97]) * N
2669 sent.append(msg)
2670 wf.write(msg)
2671 i += 1
2672
2673 except self.BlockingIOError as e:
2674 self.assertEqual(e.args[0], errno.EAGAIN)
2675 sent[-1] = sent[-1][:e.characters_written]
2676 received.append(rf.read())
2677 msg = b'BLOCKED'
2678 wf.write(msg)
2679 sent.append(msg)
2680
2681 while True:
2682 try:
2683 wf.flush()
2684 break
2685 except self.BlockingIOError as e:
2686 self.assertEqual(e.args[0], errno.EAGAIN)
2687 self.assertEqual(e.characters_written, 0)
2688 received.append(rf.read())
2689
2690 received += iter(rf.read, None)
2691
2692 sent, received = b''.join(sent), b''.join(received)
2693 self.assertTrue(sent == received)
2694 self.assertTrue(wf.closed)
2695 self.assertTrue(rf.closed)
2696
Antoine Pitrou19690592009-06-12 20:14:08 +00002697class CMiscIOTest(MiscIOTest):
2698 io = io
2699
2700class PyMiscIOTest(MiscIOTest):
2701 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002702
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002703
2704@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2705class SignalsTest(unittest.TestCase):
2706
2707 def setUp(self):
2708 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2709
2710 def tearDown(self):
2711 signal.signal(signal.SIGALRM, self.oldalrm)
2712
2713 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00002714 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002715
2716 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinner49d495f2011-07-04 11:44:46 +02002717 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
2718 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002719 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2720 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou6439c002011-02-25 21:35:47 +00002721 invokes the signal handler, and bubbles up the exception raised
2722 in the latter."""
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002723 read_results = []
2724 def _read():
2725 s = os.read(r, 1)
2726 read_results.append(s)
2727 t = threading.Thread(target=_read)
2728 t.daemon = True
2729 r, w = os.pipe()
2730 try:
2731 wio = self.io.open(w, **fdopen_kwargs)
2732 t.start()
2733 signal.alarm(1)
2734 # Fill the pipe enough that the write will be blocking.
2735 # It will be interrupted by the timer armed above. Since the
2736 # other thread has read one byte, the low-level write will
2737 # return with a successful (partial) result rather than an EINTR.
2738 # The buffered IO layer must check for pending signal
2739 # handlers, which in this case will invoke alarm_interrupt().
2740 self.assertRaises(ZeroDivisionError,
2741 wio.write, item * (1024 * 1024))
2742 t.join()
2743 # We got one byte, get another one and check that it isn't a
2744 # repeat of the first one.
2745 read_results.append(os.read(r, 1))
2746 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2747 finally:
2748 os.close(w)
2749 os.close(r)
2750 # This is deliberate. If we didn't close the file descriptor
2751 # before closing wio, wio would try to flush its internal
2752 # buffer, and block again.
2753 try:
2754 wio.close()
2755 except IOError as e:
2756 if e.errno != errno.EBADF:
2757 raise
2758
2759 def test_interrupted_write_unbuffered(self):
2760 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2761
2762 def test_interrupted_write_buffered(self):
2763 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2764
2765 def test_interrupted_write_text(self):
2766 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2767
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002768 def check_reentrant_write(self, data, **fdopen_kwargs):
2769 def on_alarm(*args):
2770 # Will be called reentrantly from the same thread
2771 wio.write(data)
Victor Stinner4c41f842011-07-05 13:29:26 +02002772 1//0
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002773 signal.signal(signal.SIGALRM, on_alarm)
2774 r, w = os.pipe()
2775 wio = self.io.open(w, **fdopen_kwargs)
2776 try:
2777 signal.alarm(1)
2778 # Either the reentrant call to wio.write() fails with RuntimeError,
2779 # or the signal handler raises ZeroDivisionError.
2780 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2781 while 1:
2782 for i in range(100):
2783 wio.write(data)
2784 wio.flush()
2785 # Make sure the buffer doesn't fill up and block further writes
2786 os.read(r, len(data) * 100)
2787 exc = cm.exception
2788 if isinstance(exc, RuntimeError):
2789 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2790 finally:
2791 wio.close()
2792 os.close(r)
2793
2794 def test_reentrant_write_buffered(self):
2795 self.check_reentrant_write(b"xy", mode="wb")
2796
2797 def test_reentrant_write_text(self):
2798 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2799
Antoine Pitrou6439c002011-02-25 21:35:47 +00002800 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2801 """Check that a buffered read, when it gets interrupted (either
2802 returning a partial result or EINTR), properly invokes the signal
2803 handler and retries if the latter returned successfully."""
2804 r, w = os.pipe()
2805 fdopen_kwargs["closefd"] = False
2806 def alarm_handler(sig, frame):
2807 os.write(w, b"bar")
2808 signal.signal(signal.SIGALRM, alarm_handler)
2809 try:
2810 rio = self.io.open(r, **fdopen_kwargs)
2811 os.write(w, b"foo")
2812 signal.alarm(1)
2813 # Expected behaviour:
2814 # - first raw read() returns partial b"foo"
2815 # - second raw read() returns EINTR
2816 # - third raw read() returns b"bar"
2817 self.assertEqual(decode(rio.read(6)), "foobar")
2818 finally:
2819 rio.close()
2820 os.close(w)
2821 os.close(r)
2822
2823 def test_interrupterd_read_retry_buffered(self):
2824 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2825 mode="rb")
2826
2827 def test_interrupterd_read_retry_text(self):
2828 self.check_interrupted_read_retry(lambda x: x,
2829 mode="r")
2830
2831 @unittest.skipUnless(threading, 'Threading required for this test.')
2832 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2833 """Check that a buffered write, when it gets interrupted (either
2834 returning a partial result or EINTR), properly invokes the signal
2835 handler and retries if the latter returned successfully."""
2836 select = support.import_module("select")
2837 # A quantity that exceeds the buffer size of an anonymous pipe's
2838 # write end.
2839 N = 1024 * 1024
2840 r, w = os.pipe()
2841 fdopen_kwargs["closefd"] = False
2842 # We need a separate thread to read from the pipe and allow the
2843 # write() to finish. This thread is started after the SIGALRM is
2844 # received (forcing a first EINTR in write()).
2845 read_results = []
2846 write_finished = False
2847 def _read():
2848 while not write_finished:
2849 while r in select.select([r], [], [], 1.0)[0]:
2850 s = os.read(r, 1024)
2851 read_results.append(s)
2852 t = threading.Thread(target=_read)
2853 t.daemon = True
2854 def alarm1(sig, frame):
2855 signal.signal(signal.SIGALRM, alarm2)
2856 signal.alarm(1)
2857 def alarm2(sig, frame):
2858 t.start()
2859 signal.signal(signal.SIGALRM, alarm1)
2860 try:
2861 wio = self.io.open(w, **fdopen_kwargs)
2862 signal.alarm(1)
2863 # Expected behaviour:
2864 # - first raw write() is partial (because of the limited pipe buffer
2865 # and the first alarm)
2866 # - second raw write() returns EINTR (because of the second alarm)
2867 # - subsequent write()s are successful (either partial or complete)
2868 self.assertEqual(N, wio.write(item * N))
2869 wio.flush()
2870 write_finished = True
2871 t.join()
2872 self.assertEqual(N, sum(len(x) for x in read_results))
2873 finally:
2874 write_finished = True
2875 os.close(w)
2876 os.close(r)
2877 # This is deliberate. If we didn't close the file descriptor
2878 # before closing wio, wio would try to flush its internal
2879 # buffer, and could block (in case of failure).
2880 try:
2881 wio.close()
2882 except IOError as e:
2883 if e.errno != errno.EBADF:
2884 raise
2885
2886 def test_interrupterd_write_retry_buffered(self):
2887 self.check_interrupted_write_retry(b"x", mode="wb")
2888
2889 def test_interrupterd_write_retry_text(self):
2890 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
2891
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002892
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002893class CSignalsTest(SignalsTest):
2894 io = io
2895
2896class PySignalsTest(SignalsTest):
2897 io = pyio
2898
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002899 # Handling reentrancy issues would slow down _pyio even more, so the
2900 # tests are disabled.
2901 test_reentrant_write_buffered = None
2902 test_reentrant_write_text = None
2903
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002904
Christian Heimes1a6387e2008-03-26 12:49:49 +00002905def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00002906 tests = (CIOTest, PyIOTest,
2907 CBufferedReaderTest, PyBufferedReaderTest,
2908 CBufferedWriterTest, PyBufferedWriterTest,
2909 CBufferedRWPairTest, PyBufferedRWPairTest,
2910 CBufferedRandomTest, PyBufferedRandomTest,
2911 StatefulIncrementalDecoderTest,
2912 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2913 CTextIOWrapperTest, PyTextIOWrapperTest,
2914 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002915 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00002916 )
2917
2918 # Put the namespaces of the IO module we are testing and some useful mock
2919 # classes in the __dict__ of each test.
2920 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00002921 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00002922 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2923 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
2924 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
2925 globs = globals()
2926 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2927 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2928 # Avoid turning open into a bound method.
2929 py_io_ns["open"] = pyio.OpenWrapper
2930 for test in tests:
2931 if test.__name__.startswith("C"):
2932 for name, obj in c_io_ns.items():
2933 setattr(test, name, obj)
2934 elif test.__name__.startswith("Py"):
2935 for name, obj in py_io_ns.items():
2936 setattr(test, name, obj)
2937
2938 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002939
2940if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00002941 test_main()