blob: b9972f3ac56573d1047684da840b04de282c8e8f [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Antoine Pitrou19690592009-06-12 20:14:08 +000032import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000033import signal
34import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000035from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000036from collections import deque
37from test import test_support as support
Christian Heimes1a6387e2008-03-26 12:49:49 +000038
39import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000040import io # C implementation of io
41import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000042try:
43 import threading
44except ImportError:
45 threading = None
Antoine Pitrou5aa7df32011-11-21 20:16:44 +010046try:
47 import fcntl
48except ImportError:
49 fcntl = None
Antoine Pitrou19690592009-06-12 20:14:08 +000050
51__metaclass__ = type
52bytes = support.py3k_bytes
53
54def _default_chunk_size():
55 """Get the default TextIOWrapper chunk size"""
56 with io.open(__file__, "r", encoding="latin1") as f:
57 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000058
59
Antoine Pitrou6391b342010-09-14 18:48:19 +000060class MockRawIOWithoutRead:
61 """A RawIO implementation without read(), so as to exercise the default
62 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000063
64 def __init__(self, read_stack=()):
65 self._read_stack = list(read_stack)
66 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000067 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000068 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000069
Christian Heimes1a6387e2008-03-26 12:49:49 +000070 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000071 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000072 return len(b)
73
74 def writable(self):
75 return True
76
77 def fileno(self):
78 return 42
79
80 def readable(self):
81 return True
82
83 def seekable(self):
84 return True
85
86 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000087 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000088
89 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000090 return 0 # same comment as above
91
92 def readinto(self, buf):
93 self._reads += 1
94 max_len = len(buf)
95 try:
96 data = self._read_stack[0]
97 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000098 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +000099 return 0
100 if data is None:
101 del self._read_stack[0]
102 return None
103 n = len(data)
104 if len(data) <= max_len:
105 del self._read_stack[0]
106 buf[:n] = data
107 return n
108 else:
109 buf[:] = data[:max_len]
110 self._read_stack[0] = data[max_len:]
111 return max_len
112
113 def truncate(self, pos=None):
114 return pos
115
Antoine Pitrou6391b342010-09-14 18:48:19 +0000116class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
117 pass
118
119class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
120 pass
121
122
123class MockRawIO(MockRawIOWithoutRead):
124
125 def read(self, n=None):
126 self._reads += 1
127 try:
128 return self._read_stack.pop(0)
129 except:
130 self._extraneous_reads += 1
131 return b""
132
Antoine Pitrou19690592009-06-12 20:14:08 +0000133class CMockRawIO(MockRawIO, io.RawIOBase):
134 pass
135
136class PyMockRawIO(MockRawIO, pyio.RawIOBase):
137 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000138
139
Antoine Pitrou19690592009-06-12 20:14:08 +0000140class MisbehavedRawIO(MockRawIO):
141 def write(self, b):
142 return MockRawIO.write(self, b) * 2
143
144 def read(self, n=None):
145 return MockRawIO.read(self, n) * 2
146
147 def seek(self, pos, whence):
148 return -123
149
150 def tell(self):
151 return -456
152
153 def readinto(self, buf):
154 MockRawIO.readinto(self, buf)
155 return len(buf) * 5
156
157class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
158 pass
159
160class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
161 pass
162
163
164class CloseFailureIO(MockRawIO):
165 closed = 0
166
167 def close(self):
168 if not self.closed:
169 self.closed = 1
170 raise IOError
171
172class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
173 pass
174
175class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
176 pass
177
178
179class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000180
181 def __init__(self, data):
182 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000183 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000184
185 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000186 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000187 self.read_history.append(None if res is None else len(res))
188 return res
189
Antoine Pitrou19690592009-06-12 20:14:08 +0000190 def readinto(self, b):
191 res = super(MockFileIO, self).readinto(b)
192 self.read_history.append(res)
193 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000194
Antoine Pitrou19690592009-06-12 20:14:08 +0000195class CMockFileIO(MockFileIO, io.BytesIO):
196 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000197
Antoine Pitrou19690592009-06-12 20:14:08 +0000198class PyMockFileIO(MockFileIO, pyio.BytesIO):
199 pass
200
201
202class MockNonBlockWriterIO:
203
204 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000205 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000206 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000207
Antoine Pitrou19690592009-06-12 20:14:08 +0000208 def pop_written(self):
209 s = b"".join(self._write_stack)
210 self._write_stack[:] = []
211 return s
212
213 def block_on(self, char):
214 """Block when a given char is encountered."""
215 self._blocker_char = char
216
217 def readable(self):
218 return True
219
220 def seekable(self):
221 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000222
223 def writable(self):
224 return True
225
Antoine Pitrou19690592009-06-12 20:14:08 +0000226 def write(self, b):
227 b = bytes(b)
228 n = -1
229 if self._blocker_char:
230 try:
231 n = b.index(self._blocker_char)
232 except ValueError:
233 pass
234 else:
Antoine Pitrou5aa7df32011-11-21 20:16:44 +0100235 if n > 0:
236 # write data up to the first blocker
237 self._write_stack.append(b[:n])
238 return n
239 else:
240 # cancel blocker and indicate would block
241 self._blocker_char = None
242 return None
Antoine Pitrou19690592009-06-12 20:14:08 +0000243 self._write_stack.append(b)
244 return len(b)
245
246class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
247 BlockingIOError = io.BlockingIOError
248
249class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
250 BlockingIOError = pyio.BlockingIOError
251
Christian Heimes1a6387e2008-03-26 12:49:49 +0000252
253class IOTest(unittest.TestCase):
254
Antoine Pitrou19690592009-06-12 20:14:08 +0000255 def setUp(self):
256 support.unlink(support.TESTFN)
257
Christian Heimes1a6387e2008-03-26 12:49:49 +0000258 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000259 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000260
261 def write_ops(self, f):
262 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000263 f.truncate(0)
264 self.assertEqual(f.tell(), 5)
265 f.seek(0)
266
267 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000268 self.assertEqual(f.seek(0), 0)
269 self.assertEqual(f.write(b"Hello."), 6)
270 self.assertEqual(f.tell(), 6)
271 self.assertEqual(f.seek(-1, 1), 5)
272 self.assertEqual(f.tell(), 5)
273 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
274 self.assertEqual(f.seek(0), 0)
275 self.assertEqual(f.write(b"h"), 1)
276 self.assertEqual(f.seek(-1, 2), 13)
277 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000278
Christian Heimes1a6387e2008-03-26 12:49:49 +0000279 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000280 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000281 self.assertRaises(TypeError, f.seek, 0.0)
282
283 def read_ops(self, f, buffered=False):
284 data = f.read(5)
285 self.assertEqual(data, b"hello")
286 data = bytearray(data)
287 self.assertEqual(f.readinto(data), 5)
288 self.assertEqual(data, b" worl")
289 self.assertEqual(f.readinto(data), 2)
290 self.assertEqual(len(data), 5)
291 self.assertEqual(data[:2], b"d\n")
292 self.assertEqual(f.seek(0), 0)
293 self.assertEqual(f.read(20), b"hello world\n")
294 self.assertEqual(f.read(1), b"")
295 self.assertEqual(f.readinto(bytearray(b"x")), 0)
296 self.assertEqual(f.seek(-6, 2), 6)
297 self.assertEqual(f.read(5), b"world")
298 self.assertEqual(f.read(0), b"")
299 self.assertEqual(f.readinto(bytearray()), 0)
300 self.assertEqual(f.seek(-6, 1), 5)
301 self.assertEqual(f.read(5), b" worl")
302 self.assertEqual(f.tell(), 10)
303 self.assertRaises(TypeError, f.seek, 0.0)
304 if buffered:
305 f.seek(0)
306 self.assertEqual(f.read(), b"hello world\n")
307 f.seek(6)
308 self.assertEqual(f.read(), b"world\n")
309 self.assertEqual(f.read(), b"")
310
311 LARGE = 2**31
312
313 def large_file_ops(self, f):
314 assert f.readable()
315 assert f.writable()
316 self.assertEqual(f.seek(self.LARGE), self.LARGE)
317 self.assertEqual(f.tell(), self.LARGE)
318 self.assertEqual(f.write(b"xxx"), 3)
319 self.assertEqual(f.tell(), self.LARGE + 3)
320 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
321 self.assertEqual(f.truncate(), self.LARGE + 2)
322 self.assertEqual(f.tell(), self.LARGE + 2)
323 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
324 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000325 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000326 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
327 self.assertEqual(f.seek(-1, 2), self.LARGE)
328 self.assertEqual(f.read(2), b"x")
329
Antoine Pitrou19690592009-06-12 20:14:08 +0000330 def test_invalid_operations(self):
331 # Try writing on a file opened in read mode and vice-versa.
332 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000333 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000334 self.assertRaises(IOError, fp.read)
335 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000336 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000337 self.assertRaises(IOError, fp.write, b"blah")
338 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000339 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000340 self.assertRaises(IOError, fp.write, "blah")
341 self.assertRaises(IOError, fp.writelines, ["blah\n"])
342
Christian Heimes1a6387e2008-03-26 12:49:49 +0000343 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000344 with self.open(support.TESTFN, "wb", buffering=0) as f:
345 self.assertEqual(f.readable(), False)
346 self.assertEqual(f.writable(), True)
347 self.assertEqual(f.seekable(), True)
348 self.write_ops(f)
349 with self.open(support.TESTFN, "rb", buffering=0) as f:
350 self.assertEqual(f.readable(), True)
351 self.assertEqual(f.writable(), False)
352 self.assertEqual(f.seekable(), True)
353 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000354
355 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000356 with self.open(support.TESTFN, "wb") as f:
357 self.assertEqual(f.readable(), False)
358 self.assertEqual(f.writable(), True)
359 self.assertEqual(f.seekable(), True)
360 self.write_ops(f)
361 with self.open(support.TESTFN, "rb") as f:
362 self.assertEqual(f.readable(), True)
363 self.assertEqual(f.writable(), False)
364 self.assertEqual(f.seekable(), True)
365 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000366
367 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000368 with self.open(support.TESTFN, "wb") as f:
369 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
370 with self.open(support.TESTFN, "rb") as f:
371 self.assertEqual(f.readline(), b"abc\n")
372 self.assertEqual(f.readline(10), b"def\n")
373 self.assertEqual(f.readline(2), b"xy")
374 self.assertEqual(f.readline(4), b"zzy\n")
375 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000376 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000377 self.assertRaises(TypeError, f.readline, 5.3)
378 with self.open(support.TESTFN, "r") as f:
379 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000380
381 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000382 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000383 self.write_ops(f)
384 data = f.getvalue()
385 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000386 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000387 self.read_ops(f, True)
388
389 def test_large_file_ops(self):
390 # On Windows and Mac OSX this test comsumes large resources; It takes
391 # a long time to build the >2GB file and takes >2GB of disk space
392 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000393 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
394 if not support.is_resource_enabled("largefile"):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000395 print("\nTesting large file ops skipped on %s." % sys.platform,
396 file=sys.stderr)
397 print("It requires %d bytes and a long time." % self.LARGE,
398 file=sys.stderr)
399 print("Use 'regrtest.py -u largefile test_io' to run it.",
400 file=sys.stderr)
401 return
Antoine Pitrou19690592009-06-12 20:14:08 +0000402 with self.open(support.TESTFN, "w+b", 0) as f:
403 self.large_file_ops(f)
404 with self.open(support.TESTFN, "w+b") as f:
405 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000406
407 def test_with_open(self):
408 for bufsize in (0, 1, 100):
409 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000410 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000411 f.write(b"xxx")
412 self.assertEqual(f.closed, True)
413 f = None
414 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000415 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000416 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000417 except ZeroDivisionError:
418 self.assertEqual(f.closed, True)
419 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000420 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000421
Antoine Pitroue741cc62009-01-21 00:45:36 +0000422 # issue 5008
423 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000424 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000425 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000426 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000427 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000428 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000429 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000430 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000431 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000432
Christian Heimes1a6387e2008-03-26 12:49:49 +0000433 def test_destructor(self):
434 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000435 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000436 def __del__(self):
437 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000438 try:
439 f = super(MyFileIO, self).__del__
440 except AttributeError:
441 pass
442 else:
443 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000444 def close(self):
445 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000446 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000447 def flush(self):
448 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000449 super(MyFileIO, self).flush()
450 f = MyFileIO(support.TESTFN, "wb")
451 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000452 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000453 support.gc_collect()
454 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000455 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000456 self.assertEqual(f.read(), b"xxx")
457
458 def _check_base_destructor(self, base):
459 record = []
460 class MyIO(base):
461 def __init__(self):
462 # This exercises the availability of attributes on object
463 # destruction.
464 # (in the C version, close() is called by the tp_dealloc
465 # function, not by __del__)
466 self.on_del = 1
467 self.on_close = 2
468 self.on_flush = 3
469 def __del__(self):
470 record.append(self.on_del)
471 try:
472 f = super(MyIO, self).__del__
473 except AttributeError:
474 pass
475 else:
476 f()
477 def close(self):
478 record.append(self.on_close)
479 super(MyIO, self).close()
480 def flush(self):
481 record.append(self.on_flush)
482 super(MyIO, self).flush()
483 f = MyIO()
484 del f
485 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000486 self.assertEqual(record, [1, 2, 3])
487
Antoine Pitrou19690592009-06-12 20:14:08 +0000488 def test_IOBase_destructor(self):
489 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000490
Antoine Pitrou19690592009-06-12 20:14:08 +0000491 def test_RawIOBase_destructor(self):
492 self._check_base_destructor(self.RawIOBase)
493
494 def test_BufferedIOBase_destructor(self):
495 self._check_base_destructor(self.BufferedIOBase)
496
497 def test_TextIOBase_destructor(self):
498 self._check_base_destructor(self.TextIOBase)
499
500 def test_close_flushes(self):
501 with self.open(support.TESTFN, "wb") as f:
502 f.write(b"xxx")
503 with self.open(support.TESTFN, "rb") as f:
504 self.assertEqual(f.read(), b"xxx")
505
506 def test_array_writes(self):
507 a = array.array(b'i', range(10))
508 n = len(a.tostring())
509 with self.open(support.TESTFN, "wb", 0) as f:
510 self.assertEqual(f.write(a), n)
511 with self.open(support.TESTFN, "wb") as f:
512 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000513
514 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000515 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000516 closefd=False)
517
Antoine Pitrou19690592009-06-12 20:14:08 +0000518 def test_read_closed(self):
519 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000520 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000521 with self.open(support.TESTFN, "r") as f:
522 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000523 self.assertEqual(file.read(), "egg\n")
524 file.seek(0)
525 file.close()
526 self.assertRaises(ValueError, file.read)
527
528 def test_no_closefd_with_filename(self):
529 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000530 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000531
532 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000533 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000534 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000535 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000536 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000537 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000538 self.assertEqual(file.buffer.raw.closefd, False)
539
Antoine Pitrou19690592009-06-12 20:14:08 +0000540 def test_garbage_collection(self):
541 # FileIO objects are collected, and collecting them flushes
542 # all data to disk.
543 f = self.FileIO(support.TESTFN, "wb")
544 f.write(b"abcxxx")
545 f.f = f
546 wr = weakref.ref(f)
547 del f
548 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000549 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000550 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000551 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000552
Antoine Pitrou19690592009-06-12 20:14:08 +0000553 def test_unbounded_file(self):
554 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
555 zero = "/dev/zero"
556 if not os.path.exists(zero):
557 self.skipTest("{0} does not exist".format(zero))
558 if sys.maxsize > 0x7FFFFFFF:
559 self.skipTest("test can only run in a 32-bit address space")
560 if support.real_max_memuse < support._2G:
561 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000562 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000563 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000564 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000565 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000566 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000567 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000568
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000569 def test_flush_error_on_close(self):
570 f = self.open(support.TESTFN, "wb", buffering=0)
571 def bad_flush():
572 raise IOError()
573 f.flush = bad_flush
574 self.assertRaises(IOError, f.close) # exception not swallowed
575
576 def test_multi_close(self):
577 f = self.open(support.TESTFN, "wb", buffering=0)
578 f.close()
579 f.close()
580 f.close()
581 self.assertRaises(ValueError, f.flush)
582
Antoine Pitrou6391b342010-09-14 18:48:19 +0000583 def test_RawIOBase_read(self):
584 # Exercise the default RawIOBase.read() implementation (which calls
585 # readinto() internally).
586 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
587 self.assertEqual(rawio.read(2), b"ab")
588 self.assertEqual(rawio.read(2), b"c")
589 self.assertEqual(rawio.read(2), b"d")
590 self.assertEqual(rawio.read(2), None)
591 self.assertEqual(rawio.read(2), b"ef")
592 self.assertEqual(rawio.read(2), b"g")
593 self.assertEqual(rawio.read(2), None)
594 self.assertEqual(rawio.read(2), b"")
595
Antoine Pitrou19690592009-06-12 20:14:08 +0000596class CIOTest(IOTest):
Antoine Pitrou16166452011-07-12 22:04:20 +0200597
598 def test_IOBase_finalize(self):
599 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
600 # class which inherits IOBase and an object of this class are caught
601 # in a reference cycle and close() is already in the method cache.
602 class MyIO(self.IOBase):
603 def close(self):
604 pass
605
606 # create an instance to populate the method cache
607 MyIO()
608 obj = MyIO()
609 obj.obj = obj
610 wr = weakref.ref(obj)
611 del MyIO
612 del obj
613 support.gc_collect()
614 self.assertTrue(wr() is None, wr)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000615
Antoine Pitrou19690592009-06-12 20:14:08 +0000616class PyIOTest(IOTest):
617 test_array_writes = unittest.skip(
618 "len(array.array) returns number of elements rather than bytelength"
619 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000620
621
Antoine Pitrou19690592009-06-12 20:14:08 +0000622class CommonBufferedTests:
623 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
624
625 def test_detach(self):
626 raw = self.MockRawIO()
627 buf = self.tp(raw)
628 self.assertIs(buf.detach(), raw)
629 self.assertRaises(ValueError, buf.detach)
630
631 def test_fileno(self):
632 rawio = self.MockRawIO()
633 bufio = self.tp(rawio)
634
Ezio Melotti2623a372010-11-21 13:34:58 +0000635 self.assertEqual(42, bufio.fileno())
Antoine Pitrou19690592009-06-12 20:14:08 +0000636
637 def test_no_fileno(self):
638 # XXX will we always have fileno() function? If so, kill
639 # this test. Else, write it.
640 pass
641
642 def test_invalid_args(self):
643 rawio = self.MockRawIO()
644 bufio = self.tp(rawio)
645 # Invalid whence
646 self.assertRaises(ValueError, bufio.seek, 0, -1)
647 self.assertRaises(ValueError, bufio.seek, 0, 3)
648
649 def test_override_destructor(self):
650 tp = self.tp
651 record = []
652 class MyBufferedIO(tp):
653 def __del__(self):
654 record.append(1)
655 try:
656 f = super(MyBufferedIO, self).__del__
657 except AttributeError:
658 pass
659 else:
660 f()
661 def close(self):
662 record.append(2)
663 super(MyBufferedIO, self).close()
664 def flush(self):
665 record.append(3)
666 super(MyBufferedIO, self).flush()
667 rawio = self.MockRawIO()
668 bufio = MyBufferedIO(rawio)
669 writable = bufio.writable()
670 del bufio
671 support.gc_collect()
672 if writable:
673 self.assertEqual(record, [1, 2, 3])
674 else:
675 self.assertEqual(record, [1, 2])
676
677 def test_context_manager(self):
678 # Test usability as a context manager
679 rawio = self.MockRawIO()
680 bufio = self.tp(rawio)
681 def _with():
682 with bufio:
683 pass
684 _with()
685 # bufio should now be closed, and using it a second time should raise
686 # a ValueError.
687 self.assertRaises(ValueError, _with)
688
689 def test_error_through_destructor(self):
690 # Test that the exception state is not modified by a destructor,
691 # even if close() fails.
692 rawio = self.CloseFailureIO()
693 def f():
694 self.tp(rawio).xyzzy
695 with support.captured_output("stderr") as s:
696 self.assertRaises(AttributeError, f)
697 s = s.getvalue().strip()
698 if s:
699 # The destructor *may* have printed an unraisable error, check it
700 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000701 self.assertTrue(s.startswith("Exception IOError: "), s)
702 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000703
704 def test_repr(self):
705 raw = self.MockRawIO()
706 b = self.tp(raw)
707 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
708 self.assertEqual(repr(b), "<%s>" % clsname)
709 raw.name = "dummy"
710 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
711 raw.name = b"dummy"
712 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000713
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000714 def test_flush_error_on_close(self):
715 raw = self.MockRawIO()
716 def bad_flush():
717 raise IOError()
718 raw.flush = bad_flush
719 b = self.tp(raw)
720 self.assertRaises(IOError, b.close) # exception not swallowed
721
722 def test_multi_close(self):
723 raw = self.MockRawIO()
724 b = self.tp(raw)
725 b.close()
726 b.close()
727 b.close()
728 self.assertRaises(ValueError, b.flush)
729
Antoine Pitroufc9ead62010-12-21 21:26:55 +0000730 def test_readonly_attributes(self):
731 raw = self.MockRawIO()
732 buf = self.tp(raw)
733 x = self.MockRawIO()
734 with self.assertRaises((AttributeError, TypeError)):
735 buf.raw = x
736
Christian Heimes1a6387e2008-03-26 12:49:49 +0000737
Antoine Pitrou19690592009-06-12 20:14:08 +0000738class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
739 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000740
Antoine Pitrou19690592009-06-12 20:14:08 +0000741 def test_constructor(self):
742 rawio = self.MockRawIO([b"abc"])
743 bufio = self.tp(rawio)
744 bufio.__init__(rawio)
745 bufio.__init__(rawio, buffer_size=1024)
746 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000747 self.assertEqual(b"abc", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000748 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
749 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
750 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
751 rawio = self.MockRawIO([b"abc"])
752 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000753 self.assertEqual(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000754
Antoine Pitrou19690592009-06-12 20:14:08 +0000755 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000756 for arg in (None, 7):
757 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
758 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000759 self.assertEqual(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000760 # Invalid args
761 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000762
Antoine Pitrou19690592009-06-12 20:14:08 +0000763 def test_read1(self):
764 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
765 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000766 self.assertEqual(b"a", bufio.read(1))
767 self.assertEqual(b"b", bufio.read1(1))
768 self.assertEqual(rawio._reads, 1)
769 self.assertEqual(b"c", bufio.read1(100))
770 self.assertEqual(rawio._reads, 1)
771 self.assertEqual(b"d", bufio.read1(100))
772 self.assertEqual(rawio._reads, 2)
773 self.assertEqual(b"efg", bufio.read1(100))
774 self.assertEqual(rawio._reads, 3)
775 self.assertEqual(b"", bufio.read1(100))
776 self.assertEqual(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000777 # Invalid args
778 self.assertRaises(ValueError, bufio.read1, -1)
779
780 def test_readinto(self):
781 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
782 bufio = self.tp(rawio)
783 b = bytearray(2)
Ezio Melotti2623a372010-11-21 13:34:58 +0000784 self.assertEqual(bufio.readinto(b), 2)
785 self.assertEqual(b, b"ab")
786 self.assertEqual(bufio.readinto(b), 2)
787 self.assertEqual(b, b"cd")
788 self.assertEqual(bufio.readinto(b), 2)
789 self.assertEqual(b, b"ef")
790 self.assertEqual(bufio.readinto(b), 1)
791 self.assertEqual(b, b"gf")
792 self.assertEqual(bufio.readinto(b), 0)
793 self.assertEqual(b, b"gf")
Antoine Pitrou19690592009-06-12 20:14:08 +0000794
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000795 def test_readlines(self):
796 def bufio():
797 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
798 return self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000799 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
800 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
801 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000802
Antoine Pitrou19690592009-06-12 20:14:08 +0000803 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000804 data = b"abcdefghi"
805 dlen = len(data)
806
807 tests = [
808 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
809 [ 100, [ 3, 3, 3], [ dlen ] ],
810 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
811 ]
812
813 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000814 rawio = self.MockFileIO(data)
815 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000816 pos = 0
817 for nbytes in buf_read_sizes:
Ezio Melotti2623a372010-11-21 13:34:58 +0000818 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Christian Heimes1a6387e2008-03-26 12:49:49 +0000819 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000820 # this is mildly implementation-dependent
Ezio Melotti2623a372010-11-21 13:34:58 +0000821 self.assertEqual(rawio.read_history, raw_read_sizes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000822
Antoine Pitrou19690592009-06-12 20:14:08 +0000823 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000824 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000825 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
826 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000827 self.assertEqual(b"abcd", bufio.read(6))
828 self.assertEqual(b"e", bufio.read(1))
829 self.assertEqual(b"fg", bufio.read())
830 self.assertEqual(b"", bufio.peek(1))
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200831 self.assertIsNone(bufio.read())
Ezio Melotti2623a372010-11-21 13:34:58 +0000832 self.assertEqual(b"", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000833
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200834 rawio = self.MockRawIO((b"a", None, None))
835 self.assertEqual(b"a", rawio.readall())
836 self.assertIsNone(rawio.readall())
837
Antoine Pitrou19690592009-06-12 20:14:08 +0000838 def test_read_past_eof(self):
839 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
840 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000841
Ezio Melotti2623a372010-11-21 13:34:58 +0000842 self.assertEqual(b"abcdefg", bufio.read(9000))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000843
Antoine Pitrou19690592009-06-12 20:14:08 +0000844 def test_read_all(self):
845 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
846 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000847
Ezio Melotti2623a372010-11-21 13:34:58 +0000848 self.assertEqual(b"abcdefg", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000849
Victor Stinner6a102812010-04-27 23:55:59 +0000850 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +0000851 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +0000852 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000853 try:
854 # Write out many bytes with exactly the same number of 0's,
855 # 1's... 255's. This will help us check that concurrent reading
856 # doesn't duplicate or forget contents.
857 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000858 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000859 random.shuffle(l)
860 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000861 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000862 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000863 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000864 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000865 errors = []
866 results = []
867 def f():
868 try:
869 # Intra-buffer read then buffer-flushing read
870 for n in cycle([1, 19]):
871 s = bufio.read(n)
872 if not s:
873 break
874 # list.append() is atomic
875 results.append(s)
876 except Exception as e:
877 errors.append(e)
878 raise
879 threads = [threading.Thread(target=f) for x in range(20)]
880 for t in threads:
881 t.start()
882 time.sleep(0.02) # yield
883 for t in threads:
884 t.join()
885 self.assertFalse(errors,
886 "the following exceptions were caught: %r" % errors)
887 s = b''.join(results)
888 for i in range(256):
889 c = bytes(bytearray([i]))
890 self.assertEqual(s.count(c), N)
891 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000892 support.unlink(support.TESTFN)
893
894 def test_misbehaved_io(self):
895 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
896 bufio = self.tp(rawio)
897 self.assertRaises(IOError, bufio.seek, 0)
898 self.assertRaises(IOError, bufio.tell)
899
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000900 def test_no_extraneous_read(self):
901 # Issue #9550; when the raw IO object has satisfied the read request,
902 # we should not issue any additional reads, otherwise it may block
903 # (e.g. socket).
904 bufsize = 16
905 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
906 rawio = self.MockRawIO([b"x" * n])
907 bufio = self.tp(rawio, bufsize)
908 self.assertEqual(bufio.read(n), b"x" * n)
909 # Simple case: one raw read is enough to satisfy the request.
910 self.assertEqual(rawio._extraneous_reads, 0,
911 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
912 # A more complex case where two raw reads are needed to satisfy
913 # the request.
914 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
915 bufio = self.tp(rawio, bufsize)
916 self.assertEqual(bufio.read(n), b"x" * n)
917 self.assertEqual(rawio._extraneous_reads, 0,
918 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
919
920
Antoine Pitrou19690592009-06-12 20:14:08 +0000921class CBufferedReaderTest(BufferedReaderTest):
922 tp = io.BufferedReader
923
924 def test_constructor(self):
925 BufferedReaderTest.test_constructor(self)
926 # The allocation can succeed on 32-bit builds, e.g. with more
927 # than 2GB RAM and a 64-bit kernel.
928 if sys.maxsize > 0x7FFFFFFF:
929 rawio = self.MockRawIO()
930 bufio = self.tp(rawio)
931 self.assertRaises((OverflowError, MemoryError, ValueError),
932 bufio.__init__, rawio, sys.maxsize)
933
934 def test_initialization(self):
935 rawio = self.MockRawIO([b"abc"])
936 bufio = self.tp(rawio)
937 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
938 self.assertRaises(ValueError, bufio.read)
939 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
940 self.assertRaises(ValueError, bufio.read)
941 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
942 self.assertRaises(ValueError, bufio.read)
943
944 def test_misbehaved_io_read(self):
945 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
946 bufio = self.tp(rawio)
947 # _pyio.BufferedReader seems to implement reading different, so that
948 # checking this is not so easy.
949 self.assertRaises(IOError, bufio.read, 10)
950
951 def test_garbage_collection(self):
952 # C BufferedReader objects are collected.
953 # The Python version has __del__, so it ends into gc.garbage instead
954 rawio = self.FileIO(support.TESTFN, "w+b")
955 f = self.tp(rawio)
956 f.f = f
957 wr = weakref.ref(f)
958 del f
959 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000960 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +0000961
962class PyBufferedReaderTest(BufferedReaderTest):
963 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000964
965
Antoine Pitrou19690592009-06-12 20:14:08 +0000966class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
967 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000968
Antoine Pitrou19690592009-06-12 20:14:08 +0000969 def test_constructor(self):
970 rawio = self.MockRawIO()
971 bufio = self.tp(rawio)
972 bufio.__init__(rawio)
973 bufio.__init__(rawio, buffer_size=1024)
974 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000975 self.assertEqual(3, bufio.write(b"abc"))
Antoine Pitrou19690592009-06-12 20:14:08 +0000976 bufio.flush()
977 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
978 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
979 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
980 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000981 self.assertEqual(3, bufio.write(b"ghi"))
Antoine Pitrou19690592009-06-12 20:14:08 +0000982 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +0000983 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000984
Antoine Pitrou19690592009-06-12 20:14:08 +0000985 def test_detach_flush(self):
986 raw = self.MockRawIO()
987 buf = self.tp(raw)
988 buf.write(b"howdy!")
989 self.assertFalse(raw._write_stack)
990 buf.detach()
991 self.assertEqual(raw._write_stack, [b"howdy!"])
992
993 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000994 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +0000995 writer = self.MockRawIO()
996 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000997 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000998 self.assertFalse(writer._write_stack)
999
Antoine Pitrou19690592009-06-12 20:14:08 +00001000 def test_write_overflow(self):
1001 writer = self.MockRawIO()
1002 bufio = self.tp(writer, 8)
1003 contents = b"abcdefghijklmnop"
1004 for n in range(0, len(contents), 3):
1005 bufio.write(contents[n:n+3])
1006 flushed = b"".join(writer._write_stack)
1007 # At least (total - 8) bytes were implicitly flushed, perhaps more
1008 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001009 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001010
Antoine Pitrou19690592009-06-12 20:14:08 +00001011 def check_writes(self, intermediate_func):
1012 # Lots of writes, test the flushed output is as expected.
1013 contents = bytes(range(256)) * 1000
1014 n = 0
1015 writer = self.MockRawIO()
1016 bufio = self.tp(writer, 13)
1017 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1018 def gen_sizes():
1019 for size in count(1):
1020 for i in range(15):
1021 yield size
1022 sizes = gen_sizes()
1023 while n < len(contents):
1024 size = min(next(sizes), len(contents) - n)
Ezio Melotti2623a372010-11-21 13:34:58 +00001025 self.assertEqual(bufio.write(contents[n:n+size]), size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001026 intermediate_func(bufio)
1027 n += size
1028 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001029 self.assertEqual(contents,
Antoine Pitrou19690592009-06-12 20:14:08 +00001030 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001031
Antoine Pitrou19690592009-06-12 20:14:08 +00001032 def test_writes(self):
1033 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001034
Antoine Pitrou19690592009-06-12 20:14:08 +00001035 def test_writes_and_flushes(self):
1036 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001037
Antoine Pitrou19690592009-06-12 20:14:08 +00001038 def test_writes_and_seeks(self):
1039 def _seekabs(bufio):
1040 pos = bufio.tell()
1041 bufio.seek(pos + 1, 0)
1042 bufio.seek(pos - 1, 0)
1043 bufio.seek(pos, 0)
1044 self.check_writes(_seekabs)
1045 def _seekrel(bufio):
1046 pos = bufio.seek(0, 1)
1047 bufio.seek(+1, 1)
1048 bufio.seek(-1, 1)
1049 bufio.seek(pos, 0)
1050 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001051
Antoine Pitrou19690592009-06-12 20:14:08 +00001052 def test_writes_and_truncates(self):
1053 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001054
Antoine Pitrou19690592009-06-12 20:14:08 +00001055 def test_write_non_blocking(self):
1056 raw = self.MockNonBlockWriterIO()
1057 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001058
Ezio Melotti2623a372010-11-21 13:34:58 +00001059 self.assertEqual(bufio.write(b"abcd"), 4)
1060 self.assertEqual(bufio.write(b"efghi"), 5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001061 # 1 byte will be written, the rest will be buffered
1062 raw.block_on(b"k")
Ezio Melotti2623a372010-11-21 13:34:58 +00001063 self.assertEqual(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001064
Antoine Pitrou19690592009-06-12 20:14:08 +00001065 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1066 raw.block_on(b"0")
1067 try:
1068 bufio.write(b"opqrwxyz0123456789")
1069 except self.BlockingIOError as e:
1070 written = e.characters_written
1071 else:
1072 self.fail("BlockingIOError should have been raised")
Ezio Melotti2623a372010-11-21 13:34:58 +00001073 self.assertEqual(written, 16)
1074 self.assertEqual(raw.pop_written(),
Antoine Pitrou19690592009-06-12 20:14:08 +00001075 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001076
Ezio Melotti2623a372010-11-21 13:34:58 +00001077 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Antoine Pitrou19690592009-06-12 20:14:08 +00001078 s = raw.pop_written()
1079 # Previously buffered bytes were flushed
1080 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001081
Antoine Pitrou19690592009-06-12 20:14:08 +00001082 def test_write_and_rewind(self):
1083 raw = io.BytesIO()
1084 bufio = self.tp(raw, 4)
1085 self.assertEqual(bufio.write(b"abcdef"), 6)
1086 self.assertEqual(bufio.tell(), 6)
1087 bufio.seek(0, 0)
1088 self.assertEqual(bufio.write(b"XY"), 2)
1089 bufio.seek(6, 0)
1090 self.assertEqual(raw.getvalue(), b"XYcdef")
1091 self.assertEqual(bufio.write(b"123456"), 6)
1092 bufio.flush()
1093 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001094
Antoine Pitrou19690592009-06-12 20:14:08 +00001095 def test_flush(self):
1096 writer = self.MockRawIO()
1097 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001098 bufio.write(b"abc")
1099 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001100 self.assertEqual(b"abc", writer._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001101
Antoine Pitrou19690592009-06-12 20:14:08 +00001102 def test_destructor(self):
1103 writer = self.MockRawIO()
1104 bufio = self.tp(writer, 8)
1105 bufio.write(b"abc")
1106 del bufio
1107 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001108 self.assertEqual(b"abc", writer._write_stack[0])
Antoine Pitrou19690592009-06-12 20:14:08 +00001109
1110 def test_truncate(self):
1111 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001112 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001113 bufio = self.tp(raw, 8)
1114 bufio.write(b"abcdef")
1115 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001116 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001117 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001118 self.assertEqual(f.read(), b"abc")
1119
Victor Stinner6a102812010-04-27 23:55:59 +00001120 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001121 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001122 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001123 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001124 # Write out many bytes from many threads and test they were
1125 # all flushed.
1126 N = 1000
1127 contents = bytes(range(256)) * N
1128 sizes = cycle([1, 19])
1129 n = 0
1130 queue = deque()
1131 while n < len(contents):
1132 size = next(sizes)
1133 queue.append(contents[n:n+size])
1134 n += size
1135 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001136 # We use a real file object because it allows us to
1137 # exercise situations where the GIL is released before
1138 # writing the buffer to the raw streams. This is in addition
1139 # to concurrency issues due to switching threads in the middle
1140 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001141 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001142 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001143 errors = []
1144 def f():
1145 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001146 while True:
1147 try:
1148 s = queue.popleft()
1149 except IndexError:
1150 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001151 bufio.write(s)
1152 except Exception as e:
1153 errors.append(e)
1154 raise
1155 threads = [threading.Thread(target=f) for x in range(20)]
1156 for t in threads:
1157 t.start()
1158 time.sleep(0.02) # yield
1159 for t in threads:
1160 t.join()
1161 self.assertFalse(errors,
1162 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001163 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001164 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001165 s = f.read()
1166 for i in range(256):
Ezio Melotti2623a372010-11-21 13:34:58 +00001167 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001168 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001169 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001170
Antoine Pitrou19690592009-06-12 20:14:08 +00001171 def test_misbehaved_io(self):
1172 rawio = self.MisbehavedRawIO()
1173 bufio = self.tp(rawio, 5)
1174 self.assertRaises(IOError, bufio.seek, 0)
1175 self.assertRaises(IOError, bufio.tell)
1176 self.assertRaises(IOError, bufio.write, b"abcdef")
1177
1178 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001179 with support.check_warnings(("max_buffer_size is deprecated",
1180 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001181 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001182
1183
1184class CBufferedWriterTest(BufferedWriterTest):
1185 tp = io.BufferedWriter
1186
1187 def test_constructor(self):
1188 BufferedWriterTest.test_constructor(self)
1189 # The allocation can succeed on 32-bit builds, e.g. with more
1190 # than 2GB RAM and a 64-bit kernel.
1191 if sys.maxsize > 0x7FFFFFFF:
1192 rawio = self.MockRawIO()
1193 bufio = self.tp(rawio)
1194 self.assertRaises((OverflowError, MemoryError, ValueError),
1195 bufio.__init__, rawio, sys.maxsize)
1196
1197 def test_initialization(self):
1198 rawio = self.MockRawIO()
1199 bufio = self.tp(rawio)
1200 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1201 self.assertRaises(ValueError, bufio.write, b"def")
1202 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1203 self.assertRaises(ValueError, bufio.write, b"def")
1204 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1205 self.assertRaises(ValueError, bufio.write, b"def")
1206
1207 def test_garbage_collection(self):
1208 # C BufferedWriter objects are collected, and collecting them flushes
1209 # all data to disk.
1210 # The Python version has __del__, so it ends into gc.garbage instead
1211 rawio = self.FileIO(support.TESTFN, "w+b")
1212 f = self.tp(rawio)
1213 f.write(b"123xxx")
1214 f.x = f
1215 wr = weakref.ref(f)
1216 del f
1217 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001218 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001219 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001220 self.assertEqual(f.read(), b"123xxx")
1221
1222
1223class PyBufferedWriterTest(BufferedWriterTest):
1224 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001225
1226class BufferedRWPairTest(unittest.TestCase):
1227
Antoine Pitrou19690592009-06-12 20:14:08 +00001228 def test_constructor(self):
1229 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001230 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001231
Antoine Pitrou19690592009-06-12 20:14:08 +00001232 def test_detach(self):
1233 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1234 self.assertRaises(self.UnsupportedOperation, pair.detach)
1235
1236 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001237 with support.check_warnings(("max_buffer_size is deprecated",
1238 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001239 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001240
1241 def test_constructor_with_not_readable(self):
1242 class NotReadable(MockRawIO):
1243 def readable(self):
1244 return False
1245
1246 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1247
1248 def test_constructor_with_not_writeable(self):
1249 class NotWriteable(MockRawIO):
1250 def writable(self):
1251 return False
1252
1253 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1254
1255 def test_read(self):
1256 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1257
1258 self.assertEqual(pair.read(3), b"abc")
1259 self.assertEqual(pair.read(1), b"d")
1260 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001261 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1262 self.assertEqual(pair.read(None), b"abc")
1263
1264 def test_readlines(self):
1265 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1266 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1267 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1268 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001269
1270 def test_read1(self):
1271 # .read1() is delegated to the underlying reader object, so this test
1272 # can be shallow.
1273 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1274
1275 self.assertEqual(pair.read1(3), b"abc")
1276
1277 def test_readinto(self):
1278 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1279
1280 data = bytearray(5)
1281 self.assertEqual(pair.readinto(data), 5)
1282 self.assertEqual(data, b"abcde")
1283
1284 def test_write(self):
1285 w = self.MockRawIO()
1286 pair = self.tp(self.MockRawIO(), w)
1287
1288 pair.write(b"abc")
1289 pair.flush()
1290 pair.write(b"def")
1291 pair.flush()
1292 self.assertEqual(w._write_stack, [b"abc", b"def"])
1293
1294 def test_peek(self):
1295 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1296
1297 self.assertTrue(pair.peek(3).startswith(b"abc"))
1298 self.assertEqual(pair.read(3), b"abc")
1299
1300 def test_readable(self):
1301 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1302 self.assertTrue(pair.readable())
1303
1304 def test_writeable(self):
1305 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1306 self.assertTrue(pair.writable())
1307
1308 def test_seekable(self):
1309 # BufferedRWPairs are never seekable, even if their readers and writers
1310 # are.
1311 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1312 self.assertFalse(pair.seekable())
1313
1314 # .flush() is delegated to the underlying writer object and has been
1315 # tested in the test_write method.
1316
1317 def test_close_and_closed(self):
1318 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1319 self.assertFalse(pair.closed)
1320 pair.close()
1321 self.assertTrue(pair.closed)
1322
1323 def test_isatty(self):
1324 class SelectableIsAtty(MockRawIO):
1325 def __init__(self, isatty):
1326 MockRawIO.__init__(self)
1327 self._isatty = isatty
1328
1329 def isatty(self):
1330 return self._isatty
1331
1332 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1333 self.assertFalse(pair.isatty())
1334
1335 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1336 self.assertTrue(pair.isatty())
1337
1338 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1339 self.assertTrue(pair.isatty())
1340
1341 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1342 self.assertTrue(pair.isatty())
1343
1344class CBufferedRWPairTest(BufferedRWPairTest):
1345 tp = io.BufferedRWPair
1346
1347class PyBufferedRWPairTest(BufferedRWPairTest):
1348 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001349
1350
Antoine Pitrou19690592009-06-12 20:14:08 +00001351class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1352 read_mode = "rb+"
1353 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001354
Antoine Pitrou19690592009-06-12 20:14:08 +00001355 def test_constructor(self):
1356 BufferedReaderTest.test_constructor(self)
1357 BufferedWriterTest.test_constructor(self)
1358
1359 def test_read_and_write(self):
1360 raw = self.MockRawIO((b"asdf", b"ghjk"))
1361 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001362
1363 self.assertEqual(b"as", rw.read(2))
1364 rw.write(b"ddd")
1365 rw.write(b"eee")
1366 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001367 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001368 self.assertEqual(b"dddeee", raw._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001369
Antoine Pitrou19690592009-06-12 20:14:08 +00001370 def test_seek_and_tell(self):
1371 raw = self.BytesIO(b"asdfghjkl")
1372 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001373
Ezio Melotti2623a372010-11-21 13:34:58 +00001374 self.assertEqual(b"as", rw.read(2))
1375 self.assertEqual(2, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001376 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001377 self.assertEqual(b"asdf", rw.read(4))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001378
Antoine Pitrou808cec52011-08-20 15:40:58 +02001379 rw.write(b"123f")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001380 rw.seek(0, 0)
Antoine Pitrou808cec52011-08-20 15:40:58 +02001381 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001382 self.assertEqual(9, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001383 rw.seek(-4, 2)
Ezio Melotti2623a372010-11-21 13:34:58 +00001384 self.assertEqual(5, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001385 rw.seek(2, 1)
Ezio Melotti2623a372010-11-21 13:34:58 +00001386 self.assertEqual(7, rw.tell())
1387 self.assertEqual(b"fl", rw.read(11))
Antoine Pitrou808cec52011-08-20 15:40:58 +02001388 rw.flush()
1389 self.assertEqual(b"asdf123fl", raw.getvalue())
1390
Christian Heimes1a6387e2008-03-26 12:49:49 +00001391 self.assertRaises(TypeError, rw.seek, 0.0)
1392
Antoine Pitrou19690592009-06-12 20:14:08 +00001393 def check_flush_and_read(self, read_func):
1394 raw = self.BytesIO(b"abcdefghi")
1395 bufio = self.tp(raw)
1396
Ezio Melotti2623a372010-11-21 13:34:58 +00001397 self.assertEqual(b"ab", read_func(bufio, 2))
Antoine Pitrou19690592009-06-12 20:14:08 +00001398 bufio.write(b"12")
Ezio Melotti2623a372010-11-21 13:34:58 +00001399 self.assertEqual(b"ef", read_func(bufio, 2))
1400 self.assertEqual(6, bufio.tell())
Antoine Pitrou19690592009-06-12 20:14:08 +00001401 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001402 self.assertEqual(6, bufio.tell())
1403 self.assertEqual(b"ghi", read_func(bufio))
Antoine Pitrou19690592009-06-12 20:14:08 +00001404 raw.seek(0, 0)
1405 raw.write(b"XYZ")
1406 # flush() resets the read buffer
1407 bufio.flush()
1408 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001409 self.assertEqual(b"XYZ", read_func(bufio, 3))
Antoine Pitrou19690592009-06-12 20:14:08 +00001410
1411 def test_flush_and_read(self):
1412 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1413
1414 def test_flush_and_readinto(self):
1415 def _readinto(bufio, n=-1):
1416 b = bytearray(n if n >= 0 else 9999)
1417 n = bufio.readinto(b)
1418 return bytes(b[:n])
1419 self.check_flush_and_read(_readinto)
1420
1421 def test_flush_and_peek(self):
1422 def _peek(bufio, n=-1):
1423 # This relies on the fact that the buffer can contain the whole
1424 # raw stream, otherwise peek() can return less.
1425 b = bufio.peek(n)
1426 if n != -1:
1427 b = b[:n]
1428 bufio.seek(len(b), 1)
1429 return b
1430 self.check_flush_and_read(_peek)
1431
1432 def test_flush_and_write(self):
1433 raw = self.BytesIO(b"abcdefghi")
1434 bufio = self.tp(raw)
1435
1436 bufio.write(b"123")
1437 bufio.flush()
1438 bufio.write(b"45")
1439 bufio.flush()
1440 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001441 self.assertEqual(b"12345fghi", raw.getvalue())
1442 self.assertEqual(b"12345fghi", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +00001443
1444 def test_threads(self):
1445 BufferedReaderTest.test_threads(self)
1446 BufferedWriterTest.test_threads(self)
1447
1448 def test_writes_and_peek(self):
1449 def _peek(bufio):
1450 bufio.peek(1)
1451 self.check_writes(_peek)
1452 def _peek(bufio):
1453 pos = bufio.tell()
1454 bufio.seek(-1, 1)
1455 bufio.peek(1)
1456 bufio.seek(pos, 0)
1457 self.check_writes(_peek)
1458
1459 def test_writes_and_reads(self):
1460 def _read(bufio):
1461 bufio.seek(-1, 1)
1462 bufio.read(1)
1463 self.check_writes(_read)
1464
1465 def test_writes_and_read1s(self):
1466 def _read1(bufio):
1467 bufio.seek(-1, 1)
1468 bufio.read1(1)
1469 self.check_writes(_read1)
1470
1471 def test_writes_and_readintos(self):
1472 def _read(bufio):
1473 bufio.seek(-1, 1)
1474 bufio.readinto(bytearray(1))
1475 self.check_writes(_read)
1476
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001477 def test_write_after_readahead(self):
1478 # Issue #6629: writing after the buffer was filled by readahead should
1479 # first rewind the raw stream.
1480 for overwrite_size in [1, 5]:
1481 raw = self.BytesIO(b"A" * 10)
1482 bufio = self.tp(raw, 4)
1483 # Trigger readahead
1484 self.assertEqual(bufio.read(1), b"A")
1485 self.assertEqual(bufio.tell(), 1)
1486 # Overwriting should rewind the raw stream if it needs so
1487 bufio.write(b"B" * overwrite_size)
1488 self.assertEqual(bufio.tell(), overwrite_size + 1)
1489 # If the write size was smaller than the buffer size, flush() and
1490 # check that rewind happens.
1491 bufio.flush()
1492 self.assertEqual(bufio.tell(), overwrite_size + 1)
1493 s = raw.getvalue()
1494 self.assertEqual(s,
1495 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1496
Antoine Pitrouee46a7b2011-05-13 00:31:52 +02001497 def test_write_rewind_write(self):
1498 # Various combinations of reading / writing / seeking backwards / writing again
1499 def mutate(bufio, pos1, pos2):
1500 assert pos2 >= pos1
1501 # Fill the buffer
1502 bufio.seek(pos1)
1503 bufio.read(pos2 - pos1)
1504 bufio.write(b'\x02')
1505 # This writes earlier than the previous write, but still inside
1506 # the buffer.
1507 bufio.seek(pos1)
1508 bufio.write(b'\x01')
1509
1510 b = b"\x80\x81\x82\x83\x84"
1511 for i in range(0, len(b)):
1512 for j in range(i, len(b)):
1513 raw = self.BytesIO(b)
1514 bufio = self.tp(raw, 100)
1515 mutate(bufio, i, j)
1516 bufio.flush()
1517 expected = bytearray(b)
1518 expected[j] = 2
1519 expected[i] = 1
1520 self.assertEqual(raw.getvalue(), expected,
1521 "failed result for i=%d, j=%d" % (i, j))
1522
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001523 def test_truncate_after_read_or_write(self):
1524 raw = self.BytesIO(b"A" * 10)
1525 bufio = self.tp(raw, 100)
1526 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1527 self.assertEqual(bufio.truncate(), 2)
1528 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1529 self.assertEqual(bufio.truncate(), 4)
1530
Antoine Pitrou19690592009-06-12 20:14:08 +00001531 def test_misbehaved_io(self):
1532 BufferedReaderTest.test_misbehaved_io(self)
1533 BufferedWriterTest.test_misbehaved_io(self)
1534
Antoine Pitrou808cec52011-08-20 15:40:58 +02001535 def test_interleaved_read_write(self):
1536 # Test for issue #12213
1537 with self.BytesIO(b'abcdefgh') as raw:
1538 with self.tp(raw, 100) as f:
1539 f.write(b"1")
1540 self.assertEqual(f.read(1), b'b')
1541 f.write(b'2')
1542 self.assertEqual(f.read1(1), b'd')
1543 f.write(b'3')
1544 buf = bytearray(1)
1545 f.readinto(buf)
1546 self.assertEqual(buf, b'f')
1547 f.write(b'4')
1548 self.assertEqual(f.peek(1), b'h')
1549 f.flush()
1550 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1551
1552 with self.BytesIO(b'abc') as raw:
1553 with self.tp(raw, 100) as f:
1554 self.assertEqual(f.read(1), b'a')
1555 f.write(b"2")
1556 self.assertEqual(f.read(1), b'c')
1557 f.flush()
1558 self.assertEqual(raw.getvalue(), b'a2c')
1559
1560 def test_interleaved_readline_write(self):
1561 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1562 with self.tp(raw) as f:
1563 f.write(b'1')
1564 self.assertEqual(f.readline(), b'b\n')
1565 f.write(b'2')
1566 self.assertEqual(f.readline(), b'def\n')
1567 f.write(b'3')
1568 self.assertEqual(f.readline(), b'\n')
1569 f.flush()
1570 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1571
1572
Antoine Pitrou19690592009-06-12 20:14:08 +00001573class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest):
1574 tp = io.BufferedRandom
1575
1576 def test_constructor(self):
1577 BufferedRandomTest.test_constructor(self)
1578 # The allocation can succeed on 32-bit builds, e.g. with more
1579 # than 2GB RAM and a 64-bit kernel.
1580 if sys.maxsize > 0x7FFFFFFF:
1581 rawio = self.MockRawIO()
1582 bufio = self.tp(rawio)
1583 self.assertRaises((OverflowError, MemoryError, ValueError),
1584 bufio.__init__, rawio, sys.maxsize)
1585
1586 def test_garbage_collection(self):
1587 CBufferedReaderTest.test_garbage_collection(self)
1588 CBufferedWriterTest.test_garbage_collection(self)
1589
1590class PyBufferedRandomTest(BufferedRandomTest):
1591 tp = pyio.BufferedRandom
1592
1593
Christian Heimes1a6387e2008-03-26 12:49:49 +00001594# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1595# properties:
1596# - A single output character can correspond to many bytes of input.
1597# - The number of input bytes to complete the character can be
1598# undetermined until the last input byte is received.
1599# - The number of input bytes can vary depending on previous input.
1600# - A single input byte can correspond to many characters of output.
1601# - The number of output characters can be undetermined until the
1602# last input byte is received.
1603# - The number of output characters can vary depending on previous input.
1604
1605class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1606 """
1607 For testing seek/tell behavior with a stateful, buffering decoder.
1608
1609 Input is a sequence of words. Words may be fixed-length (length set
1610 by input) or variable-length (period-terminated). In variable-length
1611 mode, extra periods are ignored. Possible words are:
1612 - 'i' followed by a number sets the input length, I (maximum 99).
1613 When I is set to 0, words are space-terminated.
1614 - 'o' followed by a number sets the output length, O (maximum 99).
1615 - Any other word is converted into a word followed by a period on
1616 the output. The output word consists of the input word truncated
1617 or padded out with hyphens to make its length equal to O. If O
1618 is 0, the word is output verbatim without truncating or padding.
1619 I and O are initially set to 1. When I changes, any buffered input is
1620 re-scanned according to the new I. EOF also terminates the last word.
1621 """
1622
1623 def __init__(self, errors='strict'):
1624 codecs.IncrementalDecoder.__init__(self, errors)
1625 self.reset()
1626
1627 def __repr__(self):
1628 return '<SID %x>' % id(self)
1629
1630 def reset(self):
1631 self.i = 1
1632 self.o = 1
1633 self.buffer = bytearray()
1634
1635 def getstate(self):
1636 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1637 return bytes(self.buffer), i*100 + o
1638
1639 def setstate(self, state):
1640 buffer, io = state
1641 self.buffer = bytearray(buffer)
1642 i, o = divmod(io, 100)
1643 self.i, self.o = i ^ 1, o ^ 1
1644
1645 def decode(self, input, final=False):
1646 output = ''
1647 for b in input:
1648 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001649 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001650 if self.buffer:
1651 output += self.process_word()
1652 else:
1653 self.buffer.append(b)
1654 else: # fixed-length, terminate after self.i bytes
1655 self.buffer.append(b)
1656 if len(self.buffer) == self.i:
1657 output += self.process_word()
1658 if final and self.buffer: # EOF terminates the last word
1659 output += self.process_word()
1660 return output
1661
1662 def process_word(self):
1663 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001664 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001665 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001666 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001667 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1668 else:
1669 output = self.buffer.decode('ascii')
1670 if len(output) < self.o:
1671 output += '-'*self.o # pad out with hyphens
1672 if self.o:
1673 output = output[:self.o] # truncate to output length
1674 output += '.'
1675 self.buffer = bytearray()
1676 return output
1677
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001678 codecEnabled = False
1679
1680 @classmethod
1681 def lookupTestDecoder(cls, name):
1682 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001683 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001684 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001685 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001686 incrementalencoder=None,
1687 streamreader=None, streamwriter=None,
1688 incrementaldecoder=cls)
1689
1690# Register the previous decoder for testing.
1691# Disabled by default, tests will enable it.
1692codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1693
1694
Christian Heimes1a6387e2008-03-26 12:49:49 +00001695class StatefulIncrementalDecoderTest(unittest.TestCase):
1696 """
1697 Make sure the StatefulIncrementalDecoder actually works.
1698 """
1699
1700 test_cases = [
1701 # I=1, O=1 (fixed-length input == fixed-length output)
1702 (b'abcd', False, 'a.b.c.d.'),
1703 # I=0, O=0 (variable-length input, variable-length output)
1704 (b'oiabcd', True, 'abcd.'),
1705 # I=0, O=0 (should ignore extra periods)
1706 (b'oi...abcd...', True, 'abcd.'),
1707 # I=0, O=6 (variable-length input, fixed-length output)
1708 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1709 # I=2, O=6 (fixed-length input < fixed-length output)
1710 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1711 # I=6, O=3 (fixed-length input > fixed-length output)
1712 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1713 # I=0, then 3; O=29, then 15 (with longer output)
1714 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1715 'a----------------------------.' +
1716 'b----------------------------.' +
1717 'cde--------------------------.' +
1718 'abcdefghijabcde.' +
1719 'a.b------------.' +
1720 '.c.------------.' +
1721 'd.e------------.' +
1722 'k--------------.' +
1723 'l--------------.' +
1724 'm--------------.')
1725 ]
1726
Antoine Pitrou19690592009-06-12 20:14:08 +00001727 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001728 # Try a few one-shot test cases.
1729 for input, eof, output in self.test_cases:
1730 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001731 self.assertEqual(d.decode(input, eof), output)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001732
1733 # Also test an unfinished decode, followed by forcing EOF.
1734 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001735 self.assertEqual(d.decode(b'oiabcd'), '')
1736 self.assertEqual(d.decode(b'', 1), 'abcd.')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001737
1738class TextIOWrapperTest(unittest.TestCase):
1739
1740 def setUp(self):
1741 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1742 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001743 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001744
1745 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001746 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001747
Antoine Pitrou19690592009-06-12 20:14:08 +00001748 def test_constructor(self):
1749 r = self.BytesIO(b"\xc3\xa9\n\n")
1750 b = self.BufferedReader(r, 1000)
1751 t = self.TextIOWrapper(b)
1752 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti2623a372010-11-21 13:34:58 +00001753 self.assertEqual(t.encoding, "latin1")
1754 self.assertEqual(t.line_buffering, False)
Antoine Pitrou19690592009-06-12 20:14:08 +00001755 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti2623a372010-11-21 13:34:58 +00001756 self.assertEqual(t.encoding, "utf8")
1757 self.assertEqual(t.line_buffering, True)
1758 self.assertEqual("\xe9\n", t.readline())
Antoine Pitrou19690592009-06-12 20:14:08 +00001759 self.assertRaises(TypeError, t.__init__, b, newline=42)
1760 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1761
1762 def test_detach(self):
1763 r = self.BytesIO()
1764 b = self.BufferedWriter(r)
1765 t = self.TextIOWrapper(b)
1766 self.assertIs(t.detach(), b)
1767
1768 t = self.TextIOWrapper(b, encoding="ascii")
1769 t.write("howdy")
1770 self.assertFalse(r.getvalue())
1771 t.detach()
1772 self.assertEqual(r.getvalue(), b"howdy")
1773 self.assertRaises(ValueError, t.detach)
1774
1775 def test_repr(self):
1776 raw = self.BytesIO("hello".encode("utf-8"))
1777 b = self.BufferedReader(raw)
1778 t = self.TextIOWrapper(b, encoding="utf-8")
1779 modname = self.TextIOWrapper.__module__
1780 self.assertEqual(repr(t),
1781 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1782 raw.name = "dummy"
1783 self.assertEqual(repr(t),
1784 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1785 raw.name = b"dummy"
1786 self.assertEqual(repr(t),
1787 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1788
1789 def test_line_buffering(self):
1790 r = self.BytesIO()
1791 b = self.BufferedWriter(r, 1000)
1792 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1793 t.write("X")
Ezio Melotti2623a372010-11-21 13:34:58 +00001794 self.assertEqual(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001795 t.write("Y\nZ")
Ezio Melotti2623a372010-11-21 13:34:58 +00001796 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001797 t.write("A\rB")
Ezio Melotti2623a372010-11-21 13:34:58 +00001798 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001799
Antoine Pitrou19690592009-06-12 20:14:08 +00001800 def test_encoding(self):
1801 # Check the encoding attribute is always set, and valid
1802 b = self.BytesIO()
1803 t = self.TextIOWrapper(b, encoding="utf8")
1804 self.assertEqual(t.encoding, "utf8")
1805 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001806 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00001807 codecs.lookup(t.encoding)
1808
1809 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001810 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001811 b = self.BytesIO(b"abc\n\xff\n")
1812 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001813 self.assertRaises(UnicodeError, t.read)
1814 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001815 b = self.BytesIO(b"abc\n\xff\n")
1816 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001817 self.assertRaises(UnicodeError, t.read)
1818 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001819 b = self.BytesIO(b"abc\n\xff\n")
1820 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti2623a372010-11-21 13:34:58 +00001821 self.assertEqual(t.read(), "abc\n\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001822 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001823 b = self.BytesIO(b"abc\n\xff\n")
1824 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti2623a372010-11-21 13:34:58 +00001825 self.assertEqual(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001826
Antoine Pitrou19690592009-06-12 20:14:08 +00001827 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001828 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001829 b = self.BytesIO()
1830 t = self.TextIOWrapper(b, encoding="ascii")
1831 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001832 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001833 b = self.BytesIO()
1834 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1835 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001836 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001837 b = self.BytesIO()
1838 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001839 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001840 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001841 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001842 self.assertEqual(b.getvalue(), b"abcdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001843 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001844 b = self.BytesIO()
1845 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001846 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001847 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001848 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001849 self.assertEqual(b.getvalue(), b"abc?def\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001850
Antoine Pitrou19690592009-06-12 20:14:08 +00001851 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001852 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1853
1854 tests = [
1855 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1856 [ '', input_lines ],
1857 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1858 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1859 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1860 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001861 encodings = (
1862 'utf-8', 'latin-1',
1863 'utf-16', 'utf-16-le', 'utf-16-be',
1864 'utf-32', 'utf-32-le', 'utf-32-be',
1865 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00001866
1867 # Try a range of buffer sizes to test the case where \r is the last
1868 # character in TextIOWrapper._pending_line.
1869 for encoding in encodings:
1870 # XXX: str.encode() should return bytes
1871 data = bytes(''.join(input_lines).encode(encoding))
1872 for do_reads in (False, True):
1873 for bufsize in range(1, 10):
1874 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001875 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1876 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00001877 encoding=encoding)
1878 if do_reads:
1879 got_lines = []
1880 while True:
1881 c2 = textio.read(2)
1882 if c2 == '':
1883 break
Ezio Melotti2623a372010-11-21 13:34:58 +00001884 self.assertEqual(len(c2), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001885 got_lines.append(c2 + textio.readline())
1886 else:
1887 got_lines = list(textio)
1888
1889 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti2623a372010-11-21 13:34:58 +00001890 self.assertEqual(got_line, exp_line)
1891 self.assertEqual(len(got_lines), len(exp_lines))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001892
Antoine Pitrou19690592009-06-12 20:14:08 +00001893 def test_newlines_input(self):
1894 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001895 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1896 for newline, expected in [
1897 (None, normalized.decode("ascii").splitlines(True)),
1898 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00001899 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1900 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1901 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00001902 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00001903 buf = self.BytesIO(testdata)
1904 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti2623a372010-11-21 13:34:58 +00001905 self.assertEqual(txt.readlines(), expected)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001906 txt.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001907 self.assertEqual(txt.read(), "".join(expected))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001908
Antoine Pitrou19690592009-06-12 20:14:08 +00001909 def test_newlines_output(self):
1910 testdict = {
1911 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1912 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1913 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1914 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1915 }
1916 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1917 for newline, expected in tests:
1918 buf = self.BytesIO()
1919 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1920 txt.write("AAA\nB")
1921 txt.write("BB\nCCC\n")
1922 txt.write("X\rY\r\nZ")
1923 txt.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001924 self.assertEqual(buf.closed, False)
1925 self.assertEqual(buf.getvalue(), expected)
Antoine Pitrou19690592009-06-12 20:14:08 +00001926
1927 def test_destructor(self):
1928 l = []
1929 base = self.BytesIO
1930 class MyBytesIO(base):
1931 def close(self):
1932 l.append(self.getvalue())
1933 base.close(self)
1934 b = MyBytesIO()
1935 t = self.TextIOWrapper(b, encoding="ascii")
1936 t.write("abc")
1937 del t
1938 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001939 self.assertEqual([b"abc"], l)
Antoine Pitrou19690592009-06-12 20:14:08 +00001940
1941 def test_override_destructor(self):
1942 record = []
1943 class MyTextIO(self.TextIOWrapper):
1944 def __del__(self):
1945 record.append(1)
1946 try:
1947 f = super(MyTextIO, self).__del__
1948 except AttributeError:
1949 pass
1950 else:
1951 f()
1952 def close(self):
1953 record.append(2)
1954 super(MyTextIO, self).close()
1955 def flush(self):
1956 record.append(3)
1957 super(MyTextIO, self).flush()
1958 b = self.BytesIO()
1959 t = MyTextIO(b, encoding="ascii")
1960 del t
1961 support.gc_collect()
1962 self.assertEqual(record, [1, 2, 3])
1963
1964 def test_error_through_destructor(self):
1965 # Test that the exception state is not modified by a destructor,
1966 # even if close() fails.
1967 rawio = self.CloseFailureIO()
1968 def f():
1969 self.TextIOWrapper(rawio).xyzzy
1970 with support.captured_output("stderr") as s:
1971 self.assertRaises(AttributeError, f)
1972 s = s.getvalue().strip()
1973 if s:
1974 # The destructor *may* have printed an unraisable error, check it
1975 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001976 self.assertTrue(s.startswith("Exception IOError: "), s)
1977 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001978
1979 # Systematic tests of the text I/O API
1980
Antoine Pitrou19690592009-06-12 20:14:08 +00001981 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001982 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1983 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00001984 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001985 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00001986 self.assertEqual(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001987 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001988 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001989 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00001990 self.assertEqual(f.tell(), 0)
1991 self.assertEqual(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001992 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00001993 self.assertEqual(f.seek(0), 0)
1994 self.assertEqual(f.read(None), "abc")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001995 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001996 self.assertEqual(f.read(2), "ab")
1997 self.assertEqual(f.read(1), "c")
1998 self.assertEqual(f.read(1), "")
1999 self.assertEqual(f.read(), "")
2000 self.assertEqual(f.tell(), cookie)
2001 self.assertEqual(f.seek(0), 0)
2002 self.assertEqual(f.seek(0, 2), cookie)
2003 self.assertEqual(f.write("def"), 3)
2004 self.assertEqual(f.seek(cookie), cookie)
2005 self.assertEqual(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002006 if enc.startswith("utf"):
2007 self.multi_line_test(f, enc)
2008 f.close()
2009
2010 def multi_line_test(self, f, enc):
2011 f.seek(0)
2012 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00002013 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002014 wlines = []
2015 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2016 chars = []
2017 for i in range(size):
2018 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00002019 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002020 wlines.append((f.tell(), line))
2021 f.write(line)
2022 f.seek(0)
2023 rlines = []
2024 while True:
2025 pos = f.tell()
2026 line = f.readline()
2027 if not line:
2028 break
2029 rlines.append((pos, line))
Ezio Melotti2623a372010-11-21 13:34:58 +00002030 self.assertEqual(rlines, wlines)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002031
Antoine Pitrou19690592009-06-12 20:14:08 +00002032 def test_telling(self):
2033 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002034 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002035 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002036 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002037 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002038 p2 = f.tell()
2039 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002040 self.assertEqual(f.tell(), p0)
2041 self.assertEqual(f.readline(), "\xff\n")
2042 self.assertEqual(f.tell(), p1)
2043 self.assertEqual(f.readline(), "\xff\n")
2044 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002045 f.seek(0)
2046 for line in f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002047 self.assertEqual(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002048 self.assertRaises(IOError, f.tell)
Ezio Melotti2623a372010-11-21 13:34:58 +00002049 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002050 f.close()
2051
Antoine Pitrou19690592009-06-12 20:14:08 +00002052 def test_seeking(self):
2053 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00002054 prefix_size = chunk_size - 2
2055 u_prefix = "a" * prefix_size
2056 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti2623a372010-11-21 13:34:58 +00002057 self.assertEqual(len(u_prefix), len(prefix))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002058 u_suffix = "\u8888\n"
2059 suffix = bytes(u_suffix.encode("utf-8"))
2060 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00002061 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002062 f.write(line*2)
2063 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002064 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002065 s = f.read(prefix_size)
Ezio Melotti2623a372010-11-21 13:34:58 +00002066 self.assertEqual(s, prefix.decode("ascii"))
2067 self.assertEqual(f.tell(), prefix_size)
2068 self.assertEqual(f.readline(), u_suffix)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002069
Antoine Pitrou19690592009-06-12 20:14:08 +00002070 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002071 # Regression test for a specific bug
2072 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00002073 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002074 f.write(data)
2075 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002076 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002077 f._CHUNK_SIZE # Just test that it exists
2078 f._CHUNK_SIZE = 2
2079 f.readline()
2080 f.tell()
2081
Antoine Pitrou19690592009-06-12 20:14:08 +00002082 def test_seek_and_tell(self):
2083 #Test seek/tell using the StatefulIncrementalDecoder.
2084 # Make test faster by doing smaller seeks
2085 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00002086
Antoine Pitrou19690592009-06-12 20:14:08 +00002087 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002088 """Tell/seek to various points within a data stream and ensure
2089 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00002090 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002091 f.write(data)
2092 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002093 f = self.open(support.TESTFN, encoding='test_decoder')
2094 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00002095 decoded = f.read()
2096 f.close()
2097
2098 for i in range(min_pos, len(decoded) + 1): # seek positions
2099 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00002100 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti2623a372010-11-21 13:34:58 +00002101 self.assertEqual(f.read(i), decoded[:i])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002102 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002103 self.assertEqual(f.read(j), decoded[i:i + j])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002104 f.seek(cookie)
Ezio Melotti2623a372010-11-21 13:34:58 +00002105 self.assertEqual(f.read(), decoded[i:])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002106 f.close()
2107
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002108 # Enable the test decoder.
2109 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002110
2111 # Run the tests.
2112 try:
2113 # Try each test case.
2114 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002115 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002116
2117 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002118 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2119 offset = CHUNK_SIZE - len(input)//2
2120 prefix = b'.'*offset
2121 # Don't bother seeking into the prefix (takes too long).
2122 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002123 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002124
2125 # Ensure our test decoder won't interfere with subsequent tests.
2126 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002127 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002128
Antoine Pitrou19690592009-06-12 20:14:08 +00002129 def test_encoded_writes(self):
2130 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002131 tests = ("utf-16",
2132 "utf-16-le",
2133 "utf-16-be",
2134 "utf-32",
2135 "utf-32-le",
2136 "utf-32-be")
2137 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002138 buf = self.BytesIO()
2139 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002140 # Check if the BOM is written only once (see issue1753).
2141 f.write(data)
2142 f.write(data)
2143 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002144 self.assertEqual(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002145 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002146 self.assertEqual(f.read(), data * 2)
2147 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002148
Antoine Pitrou19690592009-06-12 20:14:08 +00002149 def test_unreadable(self):
2150 class UnReadable(self.BytesIO):
2151 def readable(self):
2152 return False
2153 txt = self.TextIOWrapper(UnReadable())
2154 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002155
Antoine Pitrou19690592009-06-12 20:14:08 +00002156 def test_read_one_by_one(self):
2157 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002158 reads = ""
2159 while True:
2160 c = txt.read(1)
2161 if not c:
2162 break
2163 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002164 self.assertEqual(reads, "AA\nBB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002165
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002166 def test_readlines(self):
2167 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2168 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2169 txt.seek(0)
2170 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2171 txt.seek(0)
2172 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2173
Christian Heimes1a6387e2008-03-26 12:49:49 +00002174 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002175 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002176 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002177 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002178 reads = ""
2179 while True:
2180 c = txt.read(128)
2181 if not c:
2182 break
2183 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002184 self.assertEqual(reads, "A"*127+"\nB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002185
2186 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002187 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002188
2189 # read one char at a time
2190 reads = ""
2191 while True:
2192 c = txt.read(1)
2193 if not c:
2194 break
2195 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002196 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002197
2198 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002199 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002200 txt._CHUNK_SIZE = 4
2201
2202 reads = ""
2203 while True:
2204 c = txt.read(4)
2205 if not c:
2206 break
2207 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002208 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002209
2210 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002211 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002212 txt._CHUNK_SIZE = 4
2213
2214 reads = txt.read(4)
2215 reads += txt.read(4)
2216 reads += txt.readline()
2217 reads += txt.readline()
2218 reads += txt.readline()
Ezio Melotti2623a372010-11-21 13:34:58 +00002219 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002220
2221 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002222 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002223 txt._CHUNK_SIZE = 4
2224
2225 reads = txt.read(4)
2226 reads += txt.read()
Ezio Melotti2623a372010-11-21 13:34:58 +00002227 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002228
2229 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002230 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002231 txt._CHUNK_SIZE = 4
2232
2233 reads = txt.read(4)
2234 pos = txt.tell()
2235 txt.seek(0)
2236 txt.seek(pos)
Ezio Melotti2623a372010-11-21 13:34:58 +00002237 self.assertEqual(txt.read(4), "BBB\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002238
2239 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002240 buffer = self.BytesIO(self.testdata)
2241 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002242
2243 self.assertEqual(buffer.seekable(), txt.seekable())
2244
Antoine Pitrou19690592009-06-12 20:14:08 +00002245 def test_append_bom(self):
2246 # The BOM is not written again when appending to a non-empty file
2247 filename = support.TESTFN
2248 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2249 with self.open(filename, 'w', encoding=charset) as f:
2250 f.write('aaa')
2251 pos = f.tell()
2252 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002253 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002254
2255 with self.open(filename, 'a', encoding=charset) as f:
2256 f.write('xxx')
2257 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002258 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002259
Antoine Pitrou19690592009-06-12 20:14:08 +00002260 def test_seek_bom(self):
2261 # Same test, but when seeking manually
2262 filename = support.TESTFN
2263 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2264 with self.open(filename, 'w', encoding=charset) as f:
2265 f.write('aaa')
2266 pos = f.tell()
2267 with self.open(filename, 'r+', encoding=charset) as f:
2268 f.seek(pos)
2269 f.write('zzz')
2270 f.seek(0)
2271 f.write('bbb')
2272 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002273 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002274
2275 def test_errors_property(self):
2276 with self.open(support.TESTFN, "w") as f:
2277 self.assertEqual(f.errors, "strict")
2278 with self.open(support.TESTFN, "w", errors="replace") as f:
2279 self.assertEqual(f.errors, "replace")
2280
Victor Stinner6a102812010-04-27 23:55:59 +00002281 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002282 def test_threads_write(self):
2283 # Issue6750: concurrent writes could duplicate data
2284 event = threading.Event()
2285 with self.open(support.TESTFN, "w", buffering=1) as f:
2286 def run(n):
2287 text = "Thread%03d\n" % n
2288 event.wait()
2289 f.write(text)
2290 threads = [threading.Thread(target=lambda n=x: run(n))
2291 for x in range(20)]
2292 for t in threads:
2293 t.start()
2294 time.sleep(0.02)
2295 event.set()
2296 for t in threads:
2297 t.join()
2298 with self.open(support.TESTFN) as f:
2299 content = f.read()
2300 for n in range(20):
Ezio Melotti2623a372010-11-21 13:34:58 +00002301 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002302
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002303 def test_flush_error_on_close(self):
2304 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2305 def bad_flush():
2306 raise IOError()
2307 txt.flush = bad_flush
2308 self.assertRaises(IOError, txt.close) # exception not swallowed
2309
2310 def test_multi_close(self):
2311 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2312 txt.close()
2313 txt.close()
2314 txt.close()
2315 self.assertRaises(ValueError, txt.flush)
2316
Antoine Pitroufc9ead62010-12-21 21:26:55 +00002317 def test_readonly_attributes(self):
2318 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2319 buf = self.BytesIO(self.testdata)
2320 with self.assertRaises((AttributeError, TypeError)):
2321 txt.buffer = buf
2322
Antoine Pitrou19690592009-06-12 20:14:08 +00002323class CTextIOWrapperTest(TextIOWrapperTest):
2324
2325 def test_initialization(self):
2326 r = self.BytesIO(b"\xc3\xa9\n\n")
2327 b = self.BufferedReader(r, 1000)
2328 t = self.TextIOWrapper(b)
2329 self.assertRaises(TypeError, t.__init__, b, newline=42)
2330 self.assertRaises(ValueError, t.read)
2331 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2332 self.assertRaises(ValueError, t.read)
2333
2334 def test_garbage_collection(self):
2335 # C TextIOWrapper objects are collected, and collecting them flushes
2336 # all data to disk.
2337 # The Python version has __del__, so it ends in gc.garbage instead.
2338 rawio = io.FileIO(support.TESTFN, "wb")
2339 b = self.BufferedWriter(rawio)
2340 t = self.TextIOWrapper(b, encoding="ascii")
2341 t.write("456def")
2342 t.x = t
2343 wr = weakref.ref(t)
2344 del t
2345 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002346 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002347 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002348 self.assertEqual(f.read(), b"456def")
2349
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002350 def test_rwpair_cleared_before_textio(self):
2351 # Issue 13070: TextIOWrapper's finalization would crash when called
2352 # after the reference to the underlying BufferedRWPair's writer got
2353 # cleared by the GC.
2354 for i in range(1000):
2355 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2356 t1 = self.TextIOWrapper(b1, encoding="ascii")
2357 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2358 t2 = self.TextIOWrapper(b2, encoding="ascii")
2359 # circular references
2360 t1.buddy = t2
2361 t2.buddy = t1
2362 support.gc_collect()
2363
2364
Antoine Pitrou19690592009-06-12 20:14:08 +00002365class PyTextIOWrapperTest(TextIOWrapperTest):
2366 pass
2367
2368
2369class IncrementalNewlineDecoderTest(unittest.TestCase):
2370
2371 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002372 # UTF-8 specific tests for a newline decoder
2373 def _check_decode(b, s, **kwargs):
2374 # We exercise getstate() / setstate() as well as decode()
2375 state = decoder.getstate()
Ezio Melotti2623a372010-11-21 13:34:58 +00002376 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002377 decoder.setstate(state)
Ezio Melotti2623a372010-11-21 13:34:58 +00002378 self.assertEqual(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002379
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002380 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002381
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002382 _check_decode(b'\xe8', "")
2383 _check_decode(b'\xa2', "")
2384 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002385
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002386 _check_decode(b'\xe8', "")
2387 _check_decode(b'\xa2', "")
2388 _check_decode(b'\x88', "\u8888")
2389
2390 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002391 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2392
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002393 decoder.reset()
2394 _check_decode(b'\n', "\n")
2395 _check_decode(b'\r', "")
2396 _check_decode(b'', "\n", final=True)
2397 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002398
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002399 _check_decode(b'\r', "")
2400 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002401
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002402 _check_decode(b'\r\r\n', "\n\n")
2403 _check_decode(b'\r', "")
2404 _check_decode(b'\r', "\n")
2405 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002406
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002407 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2408 _check_decode(b'\xe8\xa2\x88', "\u8888")
2409 _check_decode(b'\n', "\n")
2410 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2411 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002412
Antoine Pitrou19690592009-06-12 20:14:08 +00002413 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002414 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002415 if encoding is not None:
2416 encoder = codecs.getincrementalencoder(encoding)()
2417 def _decode_bytewise(s):
2418 # Decode one byte at a time
2419 for b in encoder.encode(s):
2420 result.append(decoder.decode(b))
2421 else:
2422 encoder = None
2423 def _decode_bytewise(s):
2424 # Decode one char at a time
2425 for c in s:
2426 result.append(decoder.decode(c))
Ezio Melotti2623a372010-11-21 13:34:58 +00002427 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002428 _decode_bytewise("abc\n\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002429 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002430 _decode_bytewise("\nabc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002431 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002432 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002433 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002434 _decode_bytewise("abc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002435 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002436 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002437 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002438 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002439 input = "abc"
2440 if encoder is not None:
2441 encoder.reset()
2442 input = encoder.encode(input)
Ezio Melotti2623a372010-11-21 13:34:58 +00002443 self.assertEqual(decoder.decode(input), "abc")
2444 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002445
2446 def test_newline_decoder(self):
2447 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002448 # None meaning the IncrementalNewlineDecoder takes unicode input
2449 # rather than bytes input
2450 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002451 'utf-16', 'utf-16-le', 'utf-16-be',
2452 'utf-32', 'utf-32-le', 'utf-32-be',
2453 )
2454 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002455 decoder = enc and codecs.getincrementaldecoder(enc)()
2456 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2457 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002458 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002459 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2460 self.check_newline_decoding_utf8(decoder)
2461
2462 def test_newline_bytes(self):
2463 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2464 def _check(dec):
Ezio Melotti2623a372010-11-21 13:34:58 +00002465 self.assertEqual(dec.newlines, None)
2466 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2467 self.assertEqual(dec.newlines, None)
2468 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2469 self.assertEqual(dec.newlines, None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002470 dec = self.IncrementalNewlineDecoder(None, translate=False)
2471 _check(dec)
2472 dec = self.IncrementalNewlineDecoder(None, translate=True)
2473 _check(dec)
2474
2475class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2476 pass
2477
2478class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2479 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002480
Christian Heimes1a6387e2008-03-26 12:49:49 +00002481
2482# XXX Tests for open()
2483
2484class MiscIOTest(unittest.TestCase):
2485
Benjamin Petersonad100c32008-11-20 22:06:22 +00002486 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002487 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002488
Antoine Pitrou19690592009-06-12 20:14:08 +00002489 def test___all__(self):
2490 for name in self.io.__all__:
2491 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002492 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002493 if name == "open":
2494 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002495 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002496 self.assertTrue(issubclass(obj, Exception), name)
2497 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002498 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002499
Benjamin Petersonad100c32008-11-20 22:06:22 +00002500 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002501 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002502 self.assertEqual(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002503 f.close()
2504
Antoine Pitrou19690592009-06-12 20:14:08 +00002505 f = self.open(support.TESTFN, "U")
Ezio Melotti2623a372010-11-21 13:34:58 +00002506 self.assertEqual(f.name, support.TESTFN)
2507 self.assertEqual(f.buffer.name, support.TESTFN)
2508 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2509 self.assertEqual(f.mode, "U")
2510 self.assertEqual(f.buffer.mode, "rb")
2511 self.assertEqual(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002512 f.close()
2513
Antoine Pitrou19690592009-06-12 20:14:08 +00002514 f = self.open(support.TESTFN, "w+")
Ezio Melotti2623a372010-11-21 13:34:58 +00002515 self.assertEqual(f.mode, "w+")
2516 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2517 self.assertEqual(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002518
Antoine Pitrou19690592009-06-12 20:14:08 +00002519 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti2623a372010-11-21 13:34:58 +00002520 self.assertEqual(g.mode, "wb")
2521 self.assertEqual(g.raw.mode, "wb")
2522 self.assertEqual(g.name, f.fileno())
2523 self.assertEqual(g.raw.name, f.fileno())
Benjamin Petersonad100c32008-11-20 22:06:22 +00002524 f.close()
2525 g.close()
2526
Antoine Pitrou19690592009-06-12 20:14:08 +00002527 def test_io_after_close(self):
2528 for kwargs in [
2529 {"mode": "w"},
2530 {"mode": "wb"},
2531 {"mode": "w", "buffering": 1},
2532 {"mode": "w", "buffering": 2},
2533 {"mode": "wb", "buffering": 0},
2534 {"mode": "r"},
2535 {"mode": "rb"},
2536 {"mode": "r", "buffering": 1},
2537 {"mode": "r", "buffering": 2},
2538 {"mode": "rb", "buffering": 0},
2539 {"mode": "w+"},
2540 {"mode": "w+b"},
2541 {"mode": "w+", "buffering": 1},
2542 {"mode": "w+", "buffering": 2},
2543 {"mode": "w+b", "buffering": 0},
2544 ]:
2545 f = self.open(support.TESTFN, **kwargs)
2546 f.close()
2547 self.assertRaises(ValueError, f.flush)
2548 self.assertRaises(ValueError, f.fileno)
2549 self.assertRaises(ValueError, f.isatty)
2550 self.assertRaises(ValueError, f.__iter__)
2551 if hasattr(f, "peek"):
2552 self.assertRaises(ValueError, f.peek, 1)
2553 self.assertRaises(ValueError, f.read)
2554 if hasattr(f, "read1"):
2555 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinner5100a402011-05-25 22:15:36 +02002556 if hasattr(f, "readall"):
2557 self.assertRaises(ValueError, f.readall)
Antoine Pitrou19690592009-06-12 20:14:08 +00002558 if hasattr(f, "readinto"):
2559 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2560 self.assertRaises(ValueError, f.readline)
2561 self.assertRaises(ValueError, f.readlines)
2562 self.assertRaises(ValueError, f.seek, 0)
2563 self.assertRaises(ValueError, f.tell)
2564 self.assertRaises(ValueError, f.truncate)
2565 self.assertRaises(ValueError, f.write,
2566 b"" if "b" in kwargs['mode'] else "")
2567 self.assertRaises(ValueError, f.writelines, [])
2568 self.assertRaises(ValueError, next, f)
2569
2570 def test_blockingioerror(self):
2571 # Various BlockingIOError issues
2572 self.assertRaises(TypeError, self.BlockingIOError)
2573 self.assertRaises(TypeError, self.BlockingIOError, 1)
2574 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2575 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2576 b = self.BlockingIOError(1, "")
2577 self.assertEqual(b.characters_written, 0)
2578 class C(unicode):
2579 pass
2580 c = C("")
2581 b = self.BlockingIOError(1, c)
2582 c.b = b
2583 b.c = c
2584 wr = weakref.ref(c)
2585 del c, b
2586 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002587 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002588
2589 def test_abcs(self):
2590 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002591 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2592 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2593 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2594 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002595
2596 def _check_abc_inheritance(self, abcmodule):
2597 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002598 self.assertIsInstance(f, abcmodule.IOBase)
2599 self.assertIsInstance(f, abcmodule.RawIOBase)
2600 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2601 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002602 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002603 self.assertIsInstance(f, abcmodule.IOBase)
2604 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2605 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2606 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002607 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002608 self.assertIsInstance(f, abcmodule.IOBase)
2609 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2610 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2611 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002612
2613 def test_abc_inheritance(self):
2614 # Test implementations inherit from their respective ABCs
2615 self._check_abc_inheritance(self)
2616
2617 def test_abc_inheritance_official(self):
2618 # Test implementations inherit from the official ABCs of the
2619 # baseline "io" module.
2620 self._check_abc_inheritance(io)
2621
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01002622 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2623 def test_nonblock_pipe_write_bigbuf(self):
2624 self._test_nonblock_pipe_write(16*1024)
2625
2626 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2627 def test_nonblock_pipe_write_smallbuf(self):
2628 self._test_nonblock_pipe_write(1024)
2629
2630 def _set_non_blocking(self, fd):
2631 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2632 self.assertNotEqual(flags, -1)
2633 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2634 self.assertEqual(res, 0)
2635
2636 def _test_nonblock_pipe_write(self, bufsize):
2637 sent = []
2638 received = []
2639 r, w = os.pipe()
2640 self._set_non_blocking(r)
2641 self._set_non_blocking(w)
2642
2643 # To exercise all code paths in the C implementation we need
2644 # to play with buffer sizes. For instance, if we choose a
2645 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2646 # then we will never get a partial write of the buffer.
2647 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2648 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2649
2650 with rf, wf:
2651 for N in 9999, 73, 7574:
2652 try:
2653 i = 0
2654 while True:
2655 msg = bytes([i % 26 + 97]) * N
2656 sent.append(msg)
2657 wf.write(msg)
2658 i += 1
2659
2660 except self.BlockingIOError as e:
2661 self.assertEqual(e.args[0], errno.EAGAIN)
2662 sent[-1] = sent[-1][:e.characters_written]
2663 received.append(rf.read())
2664 msg = b'BLOCKED'
2665 wf.write(msg)
2666 sent.append(msg)
2667
2668 while True:
2669 try:
2670 wf.flush()
2671 break
2672 except self.BlockingIOError as e:
2673 self.assertEqual(e.args[0], errno.EAGAIN)
2674 self.assertEqual(e.characters_written, 0)
2675 received.append(rf.read())
2676
2677 received += iter(rf.read, None)
2678
2679 sent, received = b''.join(sent), b''.join(received)
2680 self.assertTrue(sent == received)
2681 self.assertTrue(wf.closed)
2682 self.assertTrue(rf.closed)
2683
Antoine Pitrou19690592009-06-12 20:14:08 +00002684class CMiscIOTest(MiscIOTest):
2685 io = io
2686
2687class PyMiscIOTest(MiscIOTest):
2688 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002689
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002690
2691@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2692class SignalsTest(unittest.TestCase):
2693
2694 def setUp(self):
2695 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2696
2697 def tearDown(self):
2698 signal.signal(signal.SIGALRM, self.oldalrm)
2699
2700 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00002701 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002702
2703 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinner49d495f2011-07-04 11:44:46 +02002704 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
2705 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002706 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2707 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou6439c002011-02-25 21:35:47 +00002708 invokes the signal handler, and bubbles up the exception raised
2709 in the latter."""
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002710 read_results = []
2711 def _read():
2712 s = os.read(r, 1)
2713 read_results.append(s)
2714 t = threading.Thread(target=_read)
2715 t.daemon = True
2716 r, w = os.pipe()
2717 try:
2718 wio = self.io.open(w, **fdopen_kwargs)
2719 t.start()
2720 signal.alarm(1)
2721 # Fill the pipe enough that the write will be blocking.
2722 # It will be interrupted by the timer armed above. Since the
2723 # other thread has read one byte, the low-level write will
2724 # return with a successful (partial) result rather than an EINTR.
2725 # The buffered IO layer must check for pending signal
2726 # handlers, which in this case will invoke alarm_interrupt().
2727 self.assertRaises(ZeroDivisionError,
2728 wio.write, item * (1024 * 1024))
2729 t.join()
2730 # We got one byte, get another one and check that it isn't a
2731 # repeat of the first one.
2732 read_results.append(os.read(r, 1))
2733 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2734 finally:
2735 os.close(w)
2736 os.close(r)
2737 # This is deliberate. If we didn't close the file descriptor
2738 # before closing wio, wio would try to flush its internal
2739 # buffer, and block again.
2740 try:
2741 wio.close()
2742 except IOError as e:
2743 if e.errno != errno.EBADF:
2744 raise
2745
2746 def test_interrupted_write_unbuffered(self):
2747 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2748
2749 def test_interrupted_write_buffered(self):
2750 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2751
2752 def test_interrupted_write_text(self):
2753 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2754
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002755 def check_reentrant_write(self, data, **fdopen_kwargs):
2756 def on_alarm(*args):
2757 # Will be called reentrantly from the same thread
2758 wio.write(data)
Victor Stinner4c41f842011-07-05 13:29:26 +02002759 1//0
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002760 signal.signal(signal.SIGALRM, on_alarm)
2761 r, w = os.pipe()
2762 wio = self.io.open(w, **fdopen_kwargs)
2763 try:
2764 signal.alarm(1)
2765 # Either the reentrant call to wio.write() fails with RuntimeError,
2766 # or the signal handler raises ZeroDivisionError.
2767 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2768 while 1:
2769 for i in range(100):
2770 wio.write(data)
2771 wio.flush()
2772 # Make sure the buffer doesn't fill up and block further writes
2773 os.read(r, len(data) * 100)
2774 exc = cm.exception
2775 if isinstance(exc, RuntimeError):
2776 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2777 finally:
2778 wio.close()
2779 os.close(r)
2780
2781 def test_reentrant_write_buffered(self):
2782 self.check_reentrant_write(b"xy", mode="wb")
2783
2784 def test_reentrant_write_text(self):
2785 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2786
Antoine Pitrou6439c002011-02-25 21:35:47 +00002787 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2788 """Check that a buffered read, when it gets interrupted (either
2789 returning a partial result or EINTR), properly invokes the signal
2790 handler and retries if the latter returned successfully."""
2791 r, w = os.pipe()
2792 fdopen_kwargs["closefd"] = False
2793 def alarm_handler(sig, frame):
2794 os.write(w, b"bar")
2795 signal.signal(signal.SIGALRM, alarm_handler)
2796 try:
2797 rio = self.io.open(r, **fdopen_kwargs)
2798 os.write(w, b"foo")
2799 signal.alarm(1)
2800 # Expected behaviour:
2801 # - first raw read() returns partial b"foo"
2802 # - second raw read() returns EINTR
2803 # - third raw read() returns b"bar"
2804 self.assertEqual(decode(rio.read(6)), "foobar")
2805 finally:
2806 rio.close()
2807 os.close(w)
2808 os.close(r)
2809
2810 def test_interrupterd_read_retry_buffered(self):
2811 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2812 mode="rb")
2813
2814 def test_interrupterd_read_retry_text(self):
2815 self.check_interrupted_read_retry(lambda x: x,
2816 mode="r")
2817
2818 @unittest.skipUnless(threading, 'Threading required for this test.')
2819 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2820 """Check that a buffered write, when it gets interrupted (either
2821 returning a partial result or EINTR), properly invokes the signal
2822 handler and retries if the latter returned successfully."""
2823 select = support.import_module("select")
2824 # A quantity that exceeds the buffer size of an anonymous pipe's
2825 # write end.
2826 N = 1024 * 1024
2827 r, w = os.pipe()
2828 fdopen_kwargs["closefd"] = False
2829 # We need a separate thread to read from the pipe and allow the
2830 # write() to finish. This thread is started after the SIGALRM is
2831 # received (forcing a first EINTR in write()).
2832 read_results = []
2833 write_finished = False
2834 def _read():
2835 while not write_finished:
2836 while r in select.select([r], [], [], 1.0)[0]:
2837 s = os.read(r, 1024)
2838 read_results.append(s)
2839 t = threading.Thread(target=_read)
2840 t.daemon = True
2841 def alarm1(sig, frame):
2842 signal.signal(signal.SIGALRM, alarm2)
2843 signal.alarm(1)
2844 def alarm2(sig, frame):
2845 t.start()
2846 signal.signal(signal.SIGALRM, alarm1)
2847 try:
2848 wio = self.io.open(w, **fdopen_kwargs)
2849 signal.alarm(1)
2850 # Expected behaviour:
2851 # - first raw write() is partial (because of the limited pipe buffer
2852 # and the first alarm)
2853 # - second raw write() returns EINTR (because of the second alarm)
2854 # - subsequent write()s are successful (either partial or complete)
2855 self.assertEqual(N, wio.write(item * N))
2856 wio.flush()
2857 write_finished = True
2858 t.join()
2859 self.assertEqual(N, sum(len(x) for x in read_results))
2860 finally:
2861 write_finished = True
2862 os.close(w)
2863 os.close(r)
2864 # This is deliberate. If we didn't close the file descriptor
2865 # before closing wio, wio would try to flush its internal
2866 # buffer, and could block (in case of failure).
2867 try:
2868 wio.close()
2869 except IOError as e:
2870 if e.errno != errno.EBADF:
2871 raise
2872
2873 def test_interrupterd_write_retry_buffered(self):
2874 self.check_interrupted_write_retry(b"x", mode="wb")
2875
2876 def test_interrupterd_write_retry_text(self):
2877 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
2878
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002879
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002880class CSignalsTest(SignalsTest):
2881 io = io
2882
2883class PySignalsTest(SignalsTest):
2884 io = pyio
2885
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002886 # Handling reentrancy issues would slow down _pyio even more, so the
2887 # tests are disabled.
2888 test_reentrant_write_buffered = None
2889 test_reentrant_write_text = None
2890
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002891
Christian Heimes1a6387e2008-03-26 12:49:49 +00002892def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00002893 tests = (CIOTest, PyIOTest,
2894 CBufferedReaderTest, PyBufferedReaderTest,
2895 CBufferedWriterTest, PyBufferedWriterTest,
2896 CBufferedRWPairTest, PyBufferedRWPairTest,
2897 CBufferedRandomTest, PyBufferedRandomTest,
2898 StatefulIncrementalDecoderTest,
2899 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2900 CTextIOWrapperTest, PyTextIOWrapperTest,
2901 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002902 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00002903 )
2904
2905 # Put the namespaces of the IO module we are testing and some useful mock
2906 # classes in the __dict__ of each test.
2907 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00002908 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00002909 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2910 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
2911 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
2912 globs = globals()
2913 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2914 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2915 # Avoid turning open into a bound method.
2916 py_io_ns["open"] = pyio.OpenWrapper
2917 for test in tests:
2918 if test.__name__.startswith("C"):
2919 for name, obj in c_io_ns.items():
2920 setattr(test, name, obj)
2921 elif test.__name__.startswith("Py"):
2922 for name, obj in py_io_ns.items():
2923 setattr(test, name, obj)
2924
2925 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002926
2927if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00002928 test_main()