blob: 003a4f34bada2045e21397bcf5eba587f25c851d [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Antoine Pitrou19690592009-06-12 20:14:08 +000032import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000033import signal
34import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000035from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000036from collections import deque
37from test import test_support as support
Christian Heimes1a6387e2008-03-26 12:49:49 +000038
39import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000040import io # C implementation of io
41import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000042try:
43 import threading
44except ImportError:
45 threading = None
Antoine Pitrou19690592009-06-12 20:14:08 +000046
47__metaclass__ = type
48bytes = support.py3k_bytes
49
50def _default_chunk_size():
51 """Get the default TextIOWrapper chunk size"""
52 with io.open(__file__, "r", encoding="latin1") as f:
53 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000054
55
Antoine Pitrou6391b342010-09-14 18:48:19 +000056class MockRawIOWithoutRead:
57 """A RawIO implementation without read(), so as to exercise the default
58 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000059
60 def __init__(self, read_stack=()):
61 self._read_stack = list(read_stack)
62 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000063 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000064 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000065
Christian Heimes1a6387e2008-03-26 12:49:49 +000066 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000067 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000068 return len(b)
69
70 def writable(self):
71 return True
72
73 def fileno(self):
74 return 42
75
76 def readable(self):
77 return True
78
79 def seekable(self):
80 return True
81
82 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000083 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000084
85 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000086 return 0 # same comment as above
87
88 def readinto(self, buf):
89 self._reads += 1
90 max_len = len(buf)
91 try:
92 data = self._read_stack[0]
93 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000094 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +000095 return 0
96 if data is None:
97 del self._read_stack[0]
98 return None
99 n = len(data)
100 if len(data) <= max_len:
101 del self._read_stack[0]
102 buf[:n] = data
103 return n
104 else:
105 buf[:] = data[:max_len]
106 self._read_stack[0] = data[max_len:]
107 return max_len
108
109 def truncate(self, pos=None):
110 return pos
111
Antoine Pitrou6391b342010-09-14 18:48:19 +0000112class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
113 pass
114
115class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
116 pass
117
118
119class MockRawIO(MockRawIOWithoutRead):
120
121 def read(self, n=None):
122 self._reads += 1
123 try:
124 return self._read_stack.pop(0)
125 except:
126 self._extraneous_reads += 1
127 return b""
128
Antoine Pitrou19690592009-06-12 20:14:08 +0000129class CMockRawIO(MockRawIO, io.RawIOBase):
130 pass
131
132class PyMockRawIO(MockRawIO, pyio.RawIOBase):
133 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000134
135
Antoine Pitrou19690592009-06-12 20:14:08 +0000136class MisbehavedRawIO(MockRawIO):
137 def write(self, b):
138 return MockRawIO.write(self, b) * 2
139
140 def read(self, n=None):
141 return MockRawIO.read(self, n) * 2
142
143 def seek(self, pos, whence):
144 return -123
145
146 def tell(self):
147 return -456
148
149 def readinto(self, buf):
150 MockRawIO.readinto(self, buf)
151 return len(buf) * 5
152
153class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
154 pass
155
156class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
157 pass
158
159
160class CloseFailureIO(MockRawIO):
161 closed = 0
162
163 def close(self):
164 if not self.closed:
165 self.closed = 1
166 raise IOError
167
168class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
169 pass
170
171class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
172 pass
173
174
175class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000176
177 def __init__(self, data):
178 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000179 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000180
181 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000182 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000183 self.read_history.append(None if res is None else len(res))
184 return res
185
Antoine Pitrou19690592009-06-12 20:14:08 +0000186 def readinto(self, b):
187 res = super(MockFileIO, self).readinto(b)
188 self.read_history.append(res)
189 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000190
Antoine Pitrou19690592009-06-12 20:14:08 +0000191class CMockFileIO(MockFileIO, io.BytesIO):
192 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000193
Antoine Pitrou19690592009-06-12 20:14:08 +0000194class PyMockFileIO(MockFileIO, pyio.BytesIO):
195 pass
196
197
198class MockNonBlockWriterIO:
199
200 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000201 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000202 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000203
Antoine Pitrou19690592009-06-12 20:14:08 +0000204 def pop_written(self):
205 s = b"".join(self._write_stack)
206 self._write_stack[:] = []
207 return s
208
209 def block_on(self, char):
210 """Block when a given char is encountered."""
211 self._blocker_char = char
212
213 def readable(self):
214 return True
215
216 def seekable(self):
217 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000218
219 def writable(self):
220 return True
221
Antoine Pitrou19690592009-06-12 20:14:08 +0000222 def write(self, b):
223 b = bytes(b)
224 n = -1
225 if self._blocker_char:
226 try:
227 n = b.index(self._blocker_char)
228 except ValueError:
229 pass
230 else:
231 self._blocker_char = None
232 self._write_stack.append(b[:n])
233 raise self.BlockingIOError(0, "test blocking", n)
234 self._write_stack.append(b)
235 return len(b)
236
237class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
238 BlockingIOError = io.BlockingIOError
239
240class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
241 BlockingIOError = pyio.BlockingIOError
242
Christian Heimes1a6387e2008-03-26 12:49:49 +0000243
244class IOTest(unittest.TestCase):
245
Antoine Pitrou19690592009-06-12 20:14:08 +0000246 def setUp(self):
247 support.unlink(support.TESTFN)
248
Christian Heimes1a6387e2008-03-26 12:49:49 +0000249 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000250 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000251
252 def write_ops(self, f):
253 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000254 f.truncate(0)
255 self.assertEqual(f.tell(), 5)
256 f.seek(0)
257
258 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000259 self.assertEqual(f.seek(0), 0)
260 self.assertEqual(f.write(b"Hello."), 6)
261 self.assertEqual(f.tell(), 6)
262 self.assertEqual(f.seek(-1, 1), 5)
263 self.assertEqual(f.tell(), 5)
264 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
265 self.assertEqual(f.seek(0), 0)
266 self.assertEqual(f.write(b"h"), 1)
267 self.assertEqual(f.seek(-1, 2), 13)
268 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000269
Christian Heimes1a6387e2008-03-26 12:49:49 +0000270 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000271 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000272 self.assertRaises(TypeError, f.seek, 0.0)
273
274 def read_ops(self, f, buffered=False):
275 data = f.read(5)
276 self.assertEqual(data, b"hello")
277 data = bytearray(data)
278 self.assertEqual(f.readinto(data), 5)
279 self.assertEqual(data, b" worl")
280 self.assertEqual(f.readinto(data), 2)
281 self.assertEqual(len(data), 5)
282 self.assertEqual(data[:2], b"d\n")
283 self.assertEqual(f.seek(0), 0)
284 self.assertEqual(f.read(20), b"hello world\n")
285 self.assertEqual(f.read(1), b"")
286 self.assertEqual(f.readinto(bytearray(b"x")), 0)
287 self.assertEqual(f.seek(-6, 2), 6)
288 self.assertEqual(f.read(5), b"world")
289 self.assertEqual(f.read(0), b"")
290 self.assertEqual(f.readinto(bytearray()), 0)
291 self.assertEqual(f.seek(-6, 1), 5)
292 self.assertEqual(f.read(5), b" worl")
293 self.assertEqual(f.tell(), 10)
294 self.assertRaises(TypeError, f.seek, 0.0)
295 if buffered:
296 f.seek(0)
297 self.assertEqual(f.read(), b"hello world\n")
298 f.seek(6)
299 self.assertEqual(f.read(), b"world\n")
300 self.assertEqual(f.read(), b"")
301
302 LARGE = 2**31
303
304 def large_file_ops(self, f):
305 assert f.readable()
306 assert f.writable()
307 self.assertEqual(f.seek(self.LARGE), self.LARGE)
308 self.assertEqual(f.tell(), self.LARGE)
309 self.assertEqual(f.write(b"xxx"), 3)
310 self.assertEqual(f.tell(), self.LARGE + 3)
311 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
312 self.assertEqual(f.truncate(), self.LARGE + 2)
313 self.assertEqual(f.tell(), self.LARGE + 2)
314 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
315 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000316 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000317 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
318 self.assertEqual(f.seek(-1, 2), self.LARGE)
319 self.assertEqual(f.read(2), b"x")
320
Antoine Pitrou19690592009-06-12 20:14:08 +0000321 def test_invalid_operations(self):
322 # Try writing on a file opened in read mode and vice-versa.
323 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000324 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000325 self.assertRaises(IOError, fp.read)
326 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000327 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000328 self.assertRaises(IOError, fp.write, b"blah")
329 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000330 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000331 self.assertRaises(IOError, fp.write, "blah")
332 self.assertRaises(IOError, fp.writelines, ["blah\n"])
333
Christian Heimes1a6387e2008-03-26 12:49:49 +0000334 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000335 with self.open(support.TESTFN, "wb", buffering=0) as f:
336 self.assertEqual(f.readable(), False)
337 self.assertEqual(f.writable(), True)
338 self.assertEqual(f.seekable(), True)
339 self.write_ops(f)
340 with self.open(support.TESTFN, "rb", buffering=0) as f:
341 self.assertEqual(f.readable(), True)
342 self.assertEqual(f.writable(), False)
343 self.assertEqual(f.seekable(), True)
344 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000345
346 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000347 with self.open(support.TESTFN, "wb") as f:
348 self.assertEqual(f.readable(), False)
349 self.assertEqual(f.writable(), True)
350 self.assertEqual(f.seekable(), True)
351 self.write_ops(f)
352 with self.open(support.TESTFN, "rb") as f:
353 self.assertEqual(f.readable(), True)
354 self.assertEqual(f.writable(), False)
355 self.assertEqual(f.seekable(), True)
356 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000357
358 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000359 with self.open(support.TESTFN, "wb") as f:
360 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
361 with self.open(support.TESTFN, "rb") as f:
362 self.assertEqual(f.readline(), b"abc\n")
363 self.assertEqual(f.readline(10), b"def\n")
364 self.assertEqual(f.readline(2), b"xy")
365 self.assertEqual(f.readline(4), b"zzy\n")
366 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000367 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000368 self.assertRaises(TypeError, f.readline, 5.3)
369 with self.open(support.TESTFN, "r") as f:
370 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000371
372 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000373 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000374 self.write_ops(f)
375 data = f.getvalue()
376 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000377 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000378 self.read_ops(f, True)
379
380 def test_large_file_ops(self):
381 # On Windows and Mac OSX this test comsumes large resources; It takes
382 # a long time to build the >2GB file and takes >2GB of disk space
383 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000384 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
385 if not support.is_resource_enabled("largefile"):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000386 print("\nTesting large file ops skipped on %s." % sys.platform,
387 file=sys.stderr)
388 print("It requires %d bytes and a long time." % self.LARGE,
389 file=sys.stderr)
390 print("Use 'regrtest.py -u largefile test_io' to run it.",
391 file=sys.stderr)
392 return
Antoine Pitrou19690592009-06-12 20:14:08 +0000393 with self.open(support.TESTFN, "w+b", 0) as f:
394 self.large_file_ops(f)
395 with self.open(support.TESTFN, "w+b") as f:
396 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000397
398 def test_with_open(self):
399 for bufsize in (0, 1, 100):
400 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000401 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000402 f.write(b"xxx")
403 self.assertEqual(f.closed, True)
404 f = None
405 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000406 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000407 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000408 except ZeroDivisionError:
409 self.assertEqual(f.closed, True)
410 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000411 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000412
Antoine Pitroue741cc62009-01-21 00:45:36 +0000413 # issue 5008
414 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000415 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000416 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000417 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000418 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000419 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000420 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000421 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000422 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000423
Christian Heimes1a6387e2008-03-26 12:49:49 +0000424 def test_destructor(self):
425 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000426 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000427 def __del__(self):
428 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000429 try:
430 f = super(MyFileIO, self).__del__
431 except AttributeError:
432 pass
433 else:
434 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000435 def close(self):
436 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000437 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000438 def flush(self):
439 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000440 super(MyFileIO, self).flush()
441 f = MyFileIO(support.TESTFN, "wb")
442 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000443 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000444 support.gc_collect()
445 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000446 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000447 self.assertEqual(f.read(), b"xxx")
448
449 def _check_base_destructor(self, base):
450 record = []
451 class MyIO(base):
452 def __init__(self):
453 # This exercises the availability of attributes on object
454 # destruction.
455 # (in the C version, close() is called by the tp_dealloc
456 # function, not by __del__)
457 self.on_del = 1
458 self.on_close = 2
459 self.on_flush = 3
460 def __del__(self):
461 record.append(self.on_del)
462 try:
463 f = super(MyIO, self).__del__
464 except AttributeError:
465 pass
466 else:
467 f()
468 def close(self):
469 record.append(self.on_close)
470 super(MyIO, self).close()
471 def flush(self):
472 record.append(self.on_flush)
473 super(MyIO, self).flush()
474 f = MyIO()
475 del f
476 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000477 self.assertEqual(record, [1, 2, 3])
478
Antoine Pitrou19690592009-06-12 20:14:08 +0000479 def test_IOBase_destructor(self):
480 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000481
Antoine Pitrou19690592009-06-12 20:14:08 +0000482 def test_RawIOBase_destructor(self):
483 self._check_base_destructor(self.RawIOBase)
484
485 def test_BufferedIOBase_destructor(self):
486 self._check_base_destructor(self.BufferedIOBase)
487
488 def test_TextIOBase_destructor(self):
489 self._check_base_destructor(self.TextIOBase)
490
491 def test_close_flushes(self):
492 with self.open(support.TESTFN, "wb") as f:
493 f.write(b"xxx")
494 with self.open(support.TESTFN, "rb") as f:
495 self.assertEqual(f.read(), b"xxx")
496
497 def test_array_writes(self):
498 a = array.array(b'i', range(10))
499 n = len(a.tostring())
500 with self.open(support.TESTFN, "wb", 0) as f:
501 self.assertEqual(f.write(a), n)
502 with self.open(support.TESTFN, "wb") as f:
503 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000504
505 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000506 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000507 closefd=False)
508
Antoine Pitrou19690592009-06-12 20:14:08 +0000509 def test_read_closed(self):
510 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000511 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000512 with self.open(support.TESTFN, "r") as f:
513 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000514 self.assertEqual(file.read(), "egg\n")
515 file.seek(0)
516 file.close()
517 self.assertRaises(ValueError, file.read)
518
519 def test_no_closefd_with_filename(self):
520 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000521 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000522
523 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000524 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000525 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000526 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000527 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000528 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000529 self.assertEqual(file.buffer.raw.closefd, False)
530
Antoine Pitrou19690592009-06-12 20:14:08 +0000531 def test_garbage_collection(self):
532 # FileIO objects are collected, and collecting them flushes
533 # all data to disk.
534 f = self.FileIO(support.TESTFN, "wb")
535 f.write(b"abcxxx")
536 f.f = f
537 wr = weakref.ref(f)
538 del f
539 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000540 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000541 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000542 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000543
Antoine Pitrou19690592009-06-12 20:14:08 +0000544 def test_unbounded_file(self):
545 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
546 zero = "/dev/zero"
547 if not os.path.exists(zero):
548 self.skipTest("{0} does not exist".format(zero))
549 if sys.maxsize > 0x7FFFFFFF:
550 self.skipTest("test can only run in a 32-bit address space")
551 if support.real_max_memuse < support._2G:
552 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000553 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000554 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000555 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000556 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000557 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000558 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000559
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000560 def test_flush_error_on_close(self):
561 f = self.open(support.TESTFN, "wb", buffering=0)
562 def bad_flush():
563 raise IOError()
564 f.flush = bad_flush
565 self.assertRaises(IOError, f.close) # exception not swallowed
566
567 def test_multi_close(self):
568 f = self.open(support.TESTFN, "wb", buffering=0)
569 f.close()
570 f.close()
571 f.close()
572 self.assertRaises(ValueError, f.flush)
573
Antoine Pitrou6391b342010-09-14 18:48:19 +0000574 def test_RawIOBase_read(self):
575 # Exercise the default RawIOBase.read() implementation (which calls
576 # readinto() internally).
577 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
578 self.assertEqual(rawio.read(2), b"ab")
579 self.assertEqual(rawio.read(2), b"c")
580 self.assertEqual(rawio.read(2), b"d")
581 self.assertEqual(rawio.read(2), None)
582 self.assertEqual(rawio.read(2), b"ef")
583 self.assertEqual(rawio.read(2), b"g")
584 self.assertEqual(rawio.read(2), None)
585 self.assertEqual(rawio.read(2), b"")
586
Antoine Pitrou19690592009-06-12 20:14:08 +0000587class CIOTest(IOTest):
Antoine Pitrou16166452011-07-12 22:04:20 +0200588
589 def test_IOBase_finalize(self):
590 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
591 # class which inherits IOBase and an object of this class are caught
592 # in a reference cycle and close() is already in the method cache.
593 class MyIO(self.IOBase):
594 def close(self):
595 pass
596
597 # create an instance to populate the method cache
598 MyIO()
599 obj = MyIO()
600 obj.obj = obj
601 wr = weakref.ref(obj)
602 del MyIO
603 del obj
604 support.gc_collect()
605 self.assertTrue(wr() is None, wr)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000606
Antoine Pitrou19690592009-06-12 20:14:08 +0000607class PyIOTest(IOTest):
608 test_array_writes = unittest.skip(
609 "len(array.array) returns number of elements rather than bytelength"
610 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000611
612
Antoine Pitrou19690592009-06-12 20:14:08 +0000613class CommonBufferedTests:
614 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
615
616 def test_detach(self):
617 raw = self.MockRawIO()
618 buf = self.tp(raw)
619 self.assertIs(buf.detach(), raw)
620 self.assertRaises(ValueError, buf.detach)
621
622 def test_fileno(self):
623 rawio = self.MockRawIO()
624 bufio = self.tp(rawio)
625
Ezio Melotti2623a372010-11-21 13:34:58 +0000626 self.assertEqual(42, bufio.fileno())
Antoine Pitrou19690592009-06-12 20:14:08 +0000627
628 def test_no_fileno(self):
629 # XXX will we always have fileno() function? If so, kill
630 # this test. Else, write it.
631 pass
632
633 def test_invalid_args(self):
634 rawio = self.MockRawIO()
635 bufio = self.tp(rawio)
636 # Invalid whence
637 self.assertRaises(ValueError, bufio.seek, 0, -1)
638 self.assertRaises(ValueError, bufio.seek, 0, 3)
639
640 def test_override_destructor(self):
641 tp = self.tp
642 record = []
643 class MyBufferedIO(tp):
644 def __del__(self):
645 record.append(1)
646 try:
647 f = super(MyBufferedIO, self).__del__
648 except AttributeError:
649 pass
650 else:
651 f()
652 def close(self):
653 record.append(2)
654 super(MyBufferedIO, self).close()
655 def flush(self):
656 record.append(3)
657 super(MyBufferedIO, self).flush()
658 rawio = self.MockRawIO()
659 bufio = MyBufferedIO(rawio)
660 writable = bufio.writable()
661 del bufio
662 support.gc_collect()
663 if writable:
664 self.assertEqual(record, [1, 2, 3])
665 else:
666 self.assertEqual(record, [1, 2])
667
668 def test_context_manager(self):
669 # Test usability as a context manager
670 rawio = self.MockRawIO()
671 bufio = self.tp(rawio)
672 def _with():
673 with bufio:
674 pass
675 _with()
676 # bufio should now be closed, and using it a second time should raise
677 # a ValueError.
678 self.assertRaises(ValueError, _with)
679
680 def test_error_through_destructor(self):
681 # Test that the exception state is not modified by a destructor,
682 # even if close() fails.
683 rawio = self.CloseFailureIO()
684 def f():
685 self.tp(rawio).xyzzy
686 with support.captured_output("stderr") as s:
687 self.assertRaises(AttributeError, f)
688 s = s.getvalue().strip()
689 if s:
690 # The destructor *may* have printed an unraisable error, check it
691 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000692 self.assertTrue(s.startswith("Exception IOError: "), s)
693 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000694
695 def test_repr(self):
696 raw = self.MockRawIO()
697 b = self.tp(raw)
698 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
699 self.assertEqual(repr(b), "<%s>" % clsname)
700 raw.name = "dummy"
701 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
702 raw.name = b"dummy"
703 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000704
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000705 def test_flush_error_on_close(self):
706 raw = self.MockRawIO()
707 def bad_flush():
708 raise IOError()
709 raw.flush = bad_flush
710 b = self.tp(raw)
711 self.assertRaises(IOError, b.close) # exception not swallowed
712
713 def test_multi_close(self):
714 raw = self.MockRawIO()
715 b = self.tp(raw)
716 b.close()
717 b.close()
718 b.close()
719 self.assertRaises(ValueError, b.flush)
720
Antoine Pitroufc9ead62010-12-21 21:26:55 +0000721 def test_readonly_attributes(self):
722 raw = self.MockRawIO()
723 buf = self.tp(raw)
724 x = self.MockRawIO()
725 with self.assertRaises((AttributeError, TypeError)):
726 buf.raw = x
727
Christian Heimes1a6387e2008-03-26 12:49:49 +0000728
Antoine Pitrou19690592009-06-12 20:14:08 +0000729class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
730 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000731
Antoine Pitrou19690592009-06-12 20:14:08 +0000732 def test_constructor(self):
733 rawio = self.MockRawIO([b"abc"])
734 bufio = self.tp(rawio)
735 bufio.__init__(rawio)
736 bufio.__init__(rawio, buffer_size=1024)
737 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000738 self.assertEqual(b"abc", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000739 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
740 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
741 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
742 rawio = self.MockRawIO([b"abc"])
743 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000744 self.assertEqual(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000745
Antoine Pitrou19690592009-06-12 20:14:08 +0000746 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000747 for arg in (None, 7):
748 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
749 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000750 self.assertEqual(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000751 # Invalid args
752 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000753
Antoine Pitrou19690592009-06-12 20:14:08 +0000754 def test_read1(self):
755 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
756 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000757 self.assertEqual(b"a", bufio.read(1))
758 self.assertEqual(b"b", bufio.read1(1))
759 self.assertEqual(rawio._reads, 1)
760 self.assertEqual(b"c", bufio.read1(100))
761 self.assertEqual(rawio._reads, 1)
762 self.assertEqual(b"d", bufio.read1(100))
763 self.assertEqual(rawio._reads, 2)
764 self.assertEqual(b"efg", bufio.read1(100))
765 self.assertEqual(rawio._reads, 3)
766 self.assertEqual(b"", bufio.read1(100))
767 self.assertEqual(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000768 # Invalid args
769 self.assertRaises(ValueError, bufio.read1, -1)
770
771 def test_readinto(self):
772 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
773 bufio = self.tp(rawio)
774 b = bytearray(2)
Ezio Melotti2623a372010-11-21 13:34:58 +0000775 self.assertEqual(bufio.readinto(b), 2)
776 self.assertEqual(b, b"ab")
777 self.assertEqual(bufio.readinto(b), 2)
778 self.assertEqual(b, b"cd")
779 self.assertEqual(bufio.readinto(b), 2)
780 self.assertEqual(b, b"ef")
781 self.assertEqual(bufio.readinto(b), 1)
782 self.assertEqual(b, b"gf")
783 self.assertEqual(bufio.readinto(b), 0)
784 self.assertEqual(b, b"gf")
Antoine Pitrou19690592009-06-12 20:14:08 +0000785
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000786 def test_readlines(self):
787 def bufio():
788 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
789 return self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000790 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
791 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
792 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000793
Antoine Pitrou19690592009-06-12 20:14:08 +0000794 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000795 data = b"abcdefghi"
796 dlen = len(data)
797
798 tests = [
799 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
800 [ 100, [ 3, 3, 3], [ dlen ] ],
801 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
802 ]
803
804 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000805 rawio = self.MockFileIO(data)
806 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000807 pos = 0
808 for nbytes in buf_read_sizes:
Ezio Melotti2623a372010-11-21 13:34:58 +0000809 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Christian Heimes1a6387e2008-03-26 12:49:49 +0000810 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000811 # this is mildly implementation-dependent
Ezio Melotti2623a372010-11-21 13:34:58 +0000812 self.assertEqual(rawio.read_history, raw_read_sizes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000813
Antoine Pitrou19690592009-06-12 20:14:08 +0000814 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000815 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000816 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
817 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000818 self.assertEqual(b"abcd", bufio.read(6))
819 self.assertEqual(b"e", bufio.read(1))
820 self.assertEqual(b"fg", bufio.read())
821 self.assertEqual(b"", bufio.peek(1))
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200822 self.assertIsNone(bufio.read())
Ezio Melotti2623a372010-11-21 13:34:58 +0000823 self.assertEqual(b"", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000824
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200825 rawio = self.MockRawIO((b"a", None, None))
826 self.assertEqual(b"a", rawio.readall())
827 self.assertIsNone(rawio.readall())
828
Antoine Pitrou19690592009-06-12 20:14:08 +0000829 def test_read_past_eof(self):
830 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
831 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000832
Ezio Melotti2623a372010-11-21 13:34:58 +0000833 self.assertEqual(b"abcdefg", bufio.read(9000))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000834
Antoine Pitrou19690592009-06-12 20:14:08 +0000835 def test_read_all(self):
836 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
837 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000838
Ezio Melotti2623a372010-11-21 13:34:58 +0000839 self.assertEqual(b"abcdefg", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000840
Victor Stinner6a102812010-04-27 23:55:59 +0000841 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +0000842 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +0000843 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000844 try:
845 # Write out many bytes with exactly the same number of 0's,
846 # 1's... 255's. This will help us check that concurrent reading
847 # doesn't duplicate or forget contents.
848 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000849 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000850 random.shuffle(l)
851 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000852 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000853 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000854 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000855 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000856 errors = []
857 results = []
858 def f():
859 try:
860 # Intra-buffer read then buffer-flushing read
861 for n in cycle([1, 19]):
862 s = bufio.read(n)
863 if not s:
864 break
865 # list.append() is atomic
866 results.append(s)
867 except Exception as e:
868 errors.append(e)
869 raise
870 threads = [threading.Thread(target=f) for x in range(20)]
871 for t in threads:
872 t.start()
873 time.sleep(0.02) # yield
874 for t in threads:
875 t.join()
876 self.assertFalse(errors,
877 "the following exceptions were caught: %r" % errors)
878 s = b''.join(results)
879 for i in range(256):
880 c = bytes(bytearray([i]))
881 self.assertEqual(s.count(c), N)
882 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000883 support.unlink(support.TESTFN)
884
885 def test_misbehaved_io(self):
886 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
887 bufio = self.tp(rawio)
888 self.assertRaises(IOError, bufio.seek, 0)
889 self.assertRaises(IOError, bufio.tell)
890
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000891 def test_no_extraneous_read(self):
892 # Issue #9550; when the raw IO object has satisfied the read request,
893 # we should not issue any additional reads, otherwise it may block
894 # (e.g. socket).
895 bufsize = 16
896 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
897 rawio = self.MockRawIO([b"x" * n])
898 bufio = self.tp(rawio, bufsize)
899 self.assertEqual(bufio.read(n), b"x" * n)
900 # Simple case: one raw read is enough to satisfy the request.
901 self.assertEqual(rawio._extraneous_reads, 0,
902 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
903 # A more complex case where two raw reads are needed to satisfy
904 # the request.
905 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
906 bufio = self.tp(rawio, bufsize)
907 self.assertEqual(bufio.read(n), b"x" * n)
908 self.assertEqual(rawio._extraneous_reads, 0,
909 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
910
911
Antoine Pitrou19690592009-06-12 20:14:08 +0000912class CBufferedReaderTest(BufferedReaderTest):
913 tp = io.BufferedReader
914
915 def test_constructor(self):
916 BufferedReaderTest.test_constructor(self)
917 # The allocation can succeed on 32-bit builds, e.g. with more
918 # than 2GB RAM and a 64-bit kernel.
919 if sys.maxsize > 0x7FFFFFFF:
920 rawio = self.MockRawIO()
921 bufio = self.tp(rawio)
922 self.assertRaises((OverflowError, MemoryError, ValueError),
923 bufio.__init__, rawio, sys.maxsize)
924
925 def test_initialization(self):
926 rawio = self.MockRawIO([b"abc"])
927 bufio = self.tp(rawio)
928 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
929 self.assertRaises(ValueError, bufio.read)
930 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
931 self.assertRaises(ValueError, bufio.read)
932 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
933 self.assertRaises(ValueError, bufio.read)
934
935 def test_misbehaved_io_read(self):
936 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
937 bufio = self.tp(rawio)
938 # _pyio.BufferedReader seems to implement reading different, so that
939 # checking this is not so easy.
940 self.assertRaises(IOError, bufio.read, 10)
941
942 def test_garbage_collection(self):
943 # C BufferedReader objects are collected.
944 # The Python version has __del__, so it ends into gc.garbage instead
945 rawio = self.FileIO(support.TESTFN, "w+b")
946 f = self.tp(rawio)
947 f.f = f
948 wr = weakref.ref(f)
949 del f
950 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000951 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +0000952
953class PyBufferedReaderTest(BufferedReaderTest):
954 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000955
956
Antoine Pitrou19690592009-06-12 20:14:08 +0000957class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
958 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000959
Antoine Pitrou19690592009-06-12 20:14:08 +0000960 def test_constructor(self):
961 rawio = self.MockRawIO()
962 bufio = self.tp(rawio)
963 bufio.__init__(rawio)
964 bufio.__init__(rawio, buffer_size=1024)
965 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000966 self.assertEqual(3, bufio.write(b"abc"))
Antoine Pitrou19690592009-06-12 20:14:08 +0000967 bufio.flush()
968 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
969 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
970 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
971 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000972 self.assertEqual(3, bufio.write(b"ghi"))
Antoine Pitrou19690592009-06-12 20:14:08 +0000973 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +0000974 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000975
Antoine Pitrou19690592009-06-12 20:14:08 +0000976 def test_detach_flush(self):
977 raw = self.MockRawIO()
978 buf = self.tp(raw)
979 buf.write(b"howdy!")
980 self.assertFalse(raw._write_stack)
981 buf.detach()
982 self.assertEqual(raw._write_stack, [b"howdy!"])
983
984 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000985 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +0000986 writer = self.MockRawIO()
987 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000988 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000989 self.assertFalse(writer._write_stack)
990
Antoine Pitrou19690592009-06-12 20:14:08 +0000991 def test_write_overflow(self):
992 writer = self.MockRawIO()
993 bufio = self.tp(writer, 8)
994 contents = b"abcdefghijklmnop"
995 for n in range(0, len(contents), 3):
996 bufio.write(contents[n:n+3])
997 flushed = b"".join(writer._write_stack)
998 # At least (total - 8) bytes were implicitly flushed, perhaps more
999 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001000 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001001
Antoine Pitrou19690592009-06-12 20:14:08 +00001002 def check_writes(self, intermediate_func):
1003 # Lots of writes, test the flushed output is as expected.
1004 contents = bytes(range(256)) * 1000
1005 n = 0
1006 writer = self.MockRawIO()
1007 bufio = self.tp(writer, 13)
1008 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1009 def gen_sizes():
1010 for size in count(1):
1011 for i in range(15):
1012 yield size
1013 sizes = gen_sizes()
1014 while n < len(contents):
1015 size = min(next(sizes), len(contents) - n)
Ezio Melotti2623a372010-11-21 13:34:58 +00001016 self.assertEqual(bufio.write(contents[n:n+size]), size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001017 intermediate_func(bufio)
1018 n += size
1019 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001020 self.assertEqual(contents,
Antoine Pitrou19690592009-06-12 20:14:08 +00001021 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001022
Antoine Pitrou19690592009-06-12 20:14:08 +00001023 def test_writes(self):
1024 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001025
Antoine Pitrou19690592009-06-12 20:14:08 +00001026 def test_writes_and_flushes(self):
1027 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001028
Antoine Pitrou19690592009-06-12 20:14:08 +00001029 def test_writes_and_seeks(self):
1030 def _seekabs(bufio):
1031 pos = bufio.tell()
1032 bufio.seek(pos + 1, 0)
1033 bufio.seek(pos - 1, 0)
1034 bufio.seek(pos, 0)
1035 self.check_writes(_seekabs)
1036 def _seekrel(bufio):
1037 pos = bufio.seek(0, 1)
1038 bufio.seek(+1, 1)
1039 bufio.seek(-1, 1)
1040 bufio.seek(pos, 0)
1041 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001042
Antoine Pitrou19690592009-06-12 20:14:08 +00001043 def test_writes_and_truncates(self):
1044 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001045
Antoine Pitrou19690592009-06-12 20:14:08 +00001046 def test_write_non_blocking(self):
1047 raw = self.MockNonBlockWriterIO()
1048 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001049
Ezio Melotti2623a372010-11-21 13:34:58 +00001050 self.assertEqual(bufio.write(b"abcd"), 4)
1051 self.assertEqual(bufio.write(b"efghi"), 5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001052 # 1 byte will be written, the rest will be buffered
1053 raw.block_on(b"k")
Ezio Melotti2623a372010-11-21 13:34:58 +00001054 self.assertEqual(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001055
Antoine Pitrou19690592009-06-12 20:14:08 +00001056 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1057 raw.block_on(b"0")
1058 try:
1059 bufio.write(b"opqrwxyz0123456789")
1060 except self.BlockingIOError as e:
1061 written = e.characters_written
1062 else:
1063 self.fail("BlockingIOError should have been raised")
Ezio Melotti2623a372010-11-21 13:34:58 +00001064 self.assertEqual(written, 16)
1065 self.assertEqual(raw.pop_written(),
Antoine Pitrou19690592009-06-12 20:14:08 +00001066 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001067
Ezio Melotti2623a372010-11-21 13:34:58 +00001068 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Antoine Pitrou19690592009-06-12 20:14:08 +00001069 s = raw.pop_written()
1070 # Previously buffered bytes were flushed
1071 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001072
Antoine Pitrou19690592009-06-12 20:14:08 +00001073 def test_write_and_rewind(self):
1074 raw = io.BytesIO()
1075 bufio = self.tp(raw, 4)
1076 self.assertEqual(bufio.write(b"abcdef"), 6)
1077 self.assertEqual(bufio.tell(), 6)
1078 bufio.seek(0, 0)
1079 self.assertEqual(bufio.write(b"XY"), 2)
1080 bufio.seek(6, 0)
1081 self.assertEqual(raw.getvalue(), b"XYcdef")
1082 self.assertEqual(bufio.write(b"123456"), 6)
1083 bufio.flush()
1084 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001085
Antoine Pitrou19690592009-06-12 20:14:08 +00001086 def test_flush(self):
1087 writer = self.MockRawIO()
1088 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001089 bufio.write(b"abc")
1090 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001091 self.assertEqual(b"abc", writer._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001092
Antoine Pitrou19690592009-06-12 20:14:08 +00001093 def test_destructor(self):
1094 writer = self.MockRawIO()
1095 bufio = self.tp(writer, 8)
1096 bufio.write(b"abc")
1097 del bufio
1098 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001099 self.assertEqual(b"abc", writer._write_stack[0])
Antoine Pitrou19690592009-06-12 20:14:08 +00001100
1101 def test_truncate(self):
1102 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001103 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001104 bufio = self.tp(raw, 8)
1105 bufio.write(b"abcdef")
1106 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001107 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001108 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001109 self.assertEqual(f.read(), b"abc")
1110
Victor Stinner6a102812010-04-27 23:55:59 +00001111 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001112 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001113 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001114 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001115 # Write out many bytes from many threads and test they were
1116 # all flushed.
1117 N = 1000
1118 contents = bytes(range(256)) * N
1119 sizes = cycle([1, 19])
1120 n = 0
1121 queue = deque()
1122 while n < len(contents):
1123 size = next(sizes)
1124 queue.append(contents[n:n+size])
1125 n += size
1126 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001127 # We use a real file object because it allows us to
1128 # exercise situations where the GIL is released before
1129 # writing the buffer to the raw streams. This is in addition
1130 # to concurrency issues due to switching threads in the middle
1131 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001132 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001133 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001134 errors = []
1135 def f():
1136 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001137 while True:
1138 try:
1139 s = queue.popleft()
1140 except IndexError:
1141 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001142 bufio.write(s)
1143 except Exception as e:
1144 errors.append(e)
1145 raise
1146 threads = [threading.Thread(target=f) for x in range(20)]
1147 for t in threads:
1148 t.start()
1149 time.sleep(0.02) # yield
1150 for t in threads:
1151 t.join()
1152 self.assertFalse(errors,
1153 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001154 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001155 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001156 s = f.read()
1157 for i in range(256):
Ezio Melotti2623a372010-11-21 13:34:58 +00001158 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001159 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001160 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001161
Antoine Pitrou19690592009-06-12 20:14:08 +00001162 def test_misbehaved_io(self):
1163 rawio = self.MisbehavedRawIO()
1164 bufio = self.tp(rawio, 5)
1165 self.assertRaises(IOError, bufio.seek, 0)
1166 self.assertRaises(IOError, bufio.tell)
1167 self.assertRaises(IOError, bufio.write, b"abcdef")
1168
1169 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001170 with support.check_warnings(("max_buffer_size is deprecated",
1171 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001172 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001173
1174
1175class CBufferedWriterTest(BufferedWriterTest):
1176 tp = io.BufferedWriter
1177
1178 def test_constructor(self):
1179 BufferedWriterTest.test_constructor(self)
1180 # The allocation can succeed on 32-bit builds, e.g. with more
1181 # than 2GB RAM and a 64-bit kernel.
1182 if sys.maxsize > 0x7FFFFFFF:
1183 rawio = self.MockRawIO()
1184 bufio = self.tp(rawio)
1185 self.assertRaises((OverflowError, MemoryError, ValueError),
1186 bufio.__init__, rawio, sys.maxsize)
1187
1188 def test_initialization(self):
1189 rawio = self.MockRawIO()
1190 bufio = self.tp(rawio)
1191 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1192 self.assertRaises(ValueError, bufio.write, b"def")
1193 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1194 self.assertRaises(ValueError, bufio.write, b"def")
1195 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1196 self.assertRaises(ValueError, bufio.write, b"def")
1197
1198 def test_garbage_collection(self):
1199 # C BufferedWriter objects are collected, and collecting them flushes
1200 # all data to disk.
1201 # The Python version has __del__, so it ends into gc.garbage instead
1202 rawio = self.FileIO(support.TESTFN, "w+b")
1203 f = self.tp(rawio)
1204 f.write(b"123xxx")
1205 f.x = f
1206 wr = weakref.ref(f)
1207 del f
1208 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001209 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001210 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001211 self.assertEqual(f.read(), b"123xxx")
1212
1213
1214class PyBufferedWriterTest(BufferedWriterTest):
1215 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001216
1217class BufferedRWPairTest(unittest.TestCase):
1218
Antoine Pitrou19690592009-06-12 20:14:08 +00001219 def test_constructor(self):
1220 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001221 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001222
Antoine Pitrou19690592009-06-12 20:14:08 +00001223 def test_detach(self):
1224 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1225 self.assertRaises(self.UnsupportedOperation, pair.detach)
1226
1227 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001228 with support.check_warnings(("max_buffer_size is deprecated",
1229 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001230 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001231
1232 def test_constructor_with_not_readable(self):
1233 class NotReadable(MockRawIO):
1234 def readable(self):
1235 return False
1236
1237 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1238
1239 def test_constructor_with_not_writeable(self):
1240 class NotWriteable(MockRawIO):
1241 def writable(self):
1242 return False
1243
1244 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1245
1246 def test_read(self):
1247 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1248
1249 self.assertEqual(pair.read(3), b"abc")
1250 self.assertEqual(pair.read(1), b"d")
1251 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001252 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1253 self.assertEqual(pair.read(None), b"abc")
1254
1255 def test_readlines(self):
1256 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1257 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1258 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1259 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001260
1261 def test_read1(self):
1262 # .read1() is delegated to the underlying reader object, so this test
1263 # can be shallow.
1264 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1265
1266 self.assertEqual(pair.read1(3), b"abc")
1267
1268 def test_readinto(self):
1269 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1270
1271 data = bytearray(5)
1272 self.assertEqual(pair.readinto(data), 5)
1273 self.assertEqual(data, b"abcde")
1274
1275 def test_write(self):
1276 w = self.MockRawIO()
1277 pair = self.tp(self.MockRawIO(), w)
1278
1279 pair.write(b"abc")
1280 pair.flush()
1281 pair.write(b"def")
1282 pair.flush()
1283 self.assertEqual(w._write_stack, [b"abc", b"def"])
1284
1285 def test_peek(self):
1286 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1287
1288 self.assertTrue(pair.peek(3).startswith(b"abc"))
1289 self.assertEqual(pair.read(3), b"abc")
1290
1291 def test_readable(self):
1292 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1293 self.assertTrue(pair.readable())
1294
1295 def test_writeable(self):
1296 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1297 self.assertTrue(pair.writable())
1298
1299 def test_seekable(self):
1300 # BufferedRWPairs are never seekable, even if their readers and writers
1301 # are.
1302 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1303 self.assertFalse(pair.seekable())
1304
1305 # .flush() is delegated to the underlying writer object and has been
1306 # tested in the test_write method.
1307
1308 def test_close_and_closed(self):
1309 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1310 self.assertFalse(pair.closed)
1311 pair.close()
1312 self.assertTrue(pair.closed)
1313
1314 def test_isatty(self):
1315 class SelectableIsAtty(MockRawIO):
1316 def __init__(self, isatty):
1317 MockRawIO.__init__(self)
1318 self._isatty = isatty
1319
1320 def isatty(self):
1321 return self._isatty
1322
1323 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1324 self.assertFalse(pair.isatty())
1325
1326 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1327 self.assertTrue(pair.isatty())
1328
1329 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1330 self.assertTrue(pair.isatty())
1331
1332 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1333 self.assertTrue(pair.isatty())
1334
1335class CBufferedRWPairTest(BufferedRWPairTest):
1336 tp = io.BufferedRWPair
1337
1338class PyBufferedRWPairTest(BufferedRWPairTest):
1339 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001340
1341
Antoine Pitrou19690592009-06-12 20:14:08 +00001342class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1343 read_mode = "rb+"
1344 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001345
Antoine Pitrou19690592009-06-12 20:14:08 +00001346 def test_constructor(self):
1347 BufferedReaderTest.test_constructor(self)
1348 BufferedWriterTest.test_constructor(self)
1349
1350 def test_read_and_write(self):
1351 raw = self.MockRawIO((b"asdf", b"ghjk"))
1352 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001353
1354 self.assertEqual(b"as", rw.read(2))
1355 rw.write(b"ddd")
1356 rw.write(b"eee")
1357 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001358 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001359 self.assertEqual(b"dddeee", raw._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001360
Antoine Pitrou19690592009-06-12 20:14:08 +00001361 def test_seek_and_tell(self):
1362 raw = self.BytesIO(b"asdfghjkl")
1363 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001364
Ezio Melotti2623a372010-11-21 13:34:58 +00001365 self.assertEqual(b"as", rw.read(2))
1366 self.assertEqual(2, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001367 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001368 self.assertEqual(b"asdf", rw.read(4))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001369
Antoine Pitrou808cec52011-08-20 15:40:58 +02001370 rw.write(b"123f")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001371 rw.seek(0, 0)
Antoine Pitrou808cec52011-08-20 15:40:58 +02001372 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001373 self.assertEqual(9, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001374 rw.seek(-4, 2)
Ezio Melotti2623a372010-11-21 13:34:58 +00001375 self.assertEqual(5, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001376 rw.seek(2, 1)
Ezio Melotti2623a372010-11-21 13:34:58 +00001377 self.assertEqual(7, rw.tell())
1378 self.assertEqual(b"fl", rw.read(11))
Antoine Pitrou808cec52011-08-20 15:40:58 +02001379 rw.flush()
1380 self.assertEqual(b"asdf123fl", raw.getvalue())
1381
Christian Heimes1a6387e2008-03-26 12:49:49 +00001382 self.assertRaises(TypeError, rw.seek, 0.0)
1383
Antoine Pitrou19690592009-06-12 20:14:08 +00001384 def check_flush_and_read(self, read_func):
1385 raw = self.BytesIO(b"abcdefghi")
1386 bufio = self.tp(raw)
1387
Ezio Melotti2623a372010-11-21 13:34:58 +00001388 self.assertEqual(b"ab", read_func(bufio, 2))
Antoine Pitrou19690592009-06-12 20:14:08 +00001389 bufio.write(b"12")
Ezio Melotti2623a372010-11-21 13:34:58 +00001390 self.assertEqual(b"ef", read_func(bufio, 2))
1391 self.assertEqual(6, bufio.tell())
Antoine Pitrou19690592009-06-12 20:14:08 +00001392 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001393 self.assertEqual(6, bufio.tell())
1394 self.assertEqual(b"ghi", read_func(bufio))
Antoine Pitrou19690592009-06-12 20:14:08 +00001395 raw.seek(0, 0)
1396 raw.write(b"XYZ")
1397 # flush() resets the read buffer
1398 bufio.flush()
1399 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001400 self.assertEqual(b"XYZ", read_func(bufio, 3))
Antoine Pitrou19690592009-06-12 20:14:08 +00001401
1402 def test_flush_and_read(self):
1403 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1404
1405 def test_flush_and_readinto(self):
1406 def _readinto(bufio, n=-1):
1407 b = bytearray(n if n >= 0 else 9999)
1408 n = bufio.readinto(b)
1409 return bytes(b[:n])
1410 self.check_flush_and_read(_readinto)
1411
1412 def test_flush_and_peek(self):
1413 def _peek(bufio, n=-1):
1414 # This relies on the fact that the buffer can contain the whole
1415 # raw stream, otherwise peek() can return less.
1416 b = bufio.peek(n)
1417 if n != -1:
1418 b = b[:n]
1419 bufio.seek(len(b), 1)
1420 return b
1421 self.check_flush_and_read(_peek)
1422
1423 def test_flush_and_write(self):
1424 raw = self.BytesIO(b"abcdefghi")
1425 bufio = self.tp(raw)
1426
1427 bufio.write(b"123")
1428 bufio.flush()
1429 bufio.write(b"45")
1430 bufio.flush()
1431 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001432 self.assertEqual(b"12345fghi", raw.getvalue())
1433 self.assertEqual(b"12345fghi", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +00001434
1435 def test_threads(self):
1436 BufferedReaderTest.test_threads(self)
1437 BufferedWriterTest.test_threads(self)
1438
1439 def test_writes_and_peek(self):
1440 def _peek(bufio):
1441 bufio.peek(1)
1442 self.check_writes(_peek)
1443 def _peek(bufio):
1444 pos = bufio.tell()
1445 bufio.seek(-1, 1)
1446 bufio.peek(1)
1447 bufio.seek(pos, 0)
1448 self.check_writes(_peek)
1449
1450 def test_writes_and_reads(self):
1451 def _read(bufio):
1452 bufio.seek(-1, 1)
1453 bufio.read(1)
1454 self.check_writes(_read)
1455
1456 def test_writes_and_read1s(self):
1457 def _read1(bufio):
1458 bufio.seek(-1, 1)
1459 bufio.read1(1)
1460 self.check_writes(_read1)
1461
1462 def test_writes_and_readintos(self):
1463 def _read(bufio):
1464 bufio.seek(-1, 1)
1465 bufio.readinto(bytearray(1))
1466 self.check_writes(_read)
1467
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001468 def test_write_after_readahead(self):
1469 # Issue #6629: writing after the buffer was filled by readahead should
1470 # first rewind the raw stream.
1471 for overwrite_size in [1, 5]:
1472 raw = self.BytesIO(b"A" * 10)
1473 bufio = self.tp(raw, 4)
1474 # Trigger readahead
1475 self.assertEqual(bufio.read(1), b"A")
1476 self.assertEqual(bufio.tell(), 1)
1477 # Overwriting should rewind the raw stream if it needs so
1478 bufio.write(b"B" * overwrite_size)
1479 self.assertEqual(bufio.tell(), overwrite_size + 1)
1480 # If the write size was smaller than the buffer size, flush() and
1481 # check that rewind happens.
1482 bufio.flush()
1483 self.assertEqual(bufio.tell(), overwrite_size + 1)
1484 s = raw.getvalue()
1485 self.assertEqual(s,
1486 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1487
Antoine Pitrouee46a7b2011-05-13 00:31:52 +02001488 def test_write_rewind_write(self):
1489 # Various combinations of reading / writing / seeking backwards / writing again
1490 def mutate(bufio, pos1, pos2):
1491 assert pos2 >= pos1
1492 # Fill the buffer
1493 bufio.seek(pos1)
1494 bufio.read(pos2 - pos1)
1495 bufio.write(b'\x02')
1496 # This writes earlier than the previous write, but still inside
1497 # the buffer.
1498 bufio.seek(pos1)
1499 bufio.write(b'\x01')
1500
1501 b = b"\x80\x81\x82\x83\x84"
1502 for i in range(0, len(b)):
1503 for j in range(i, len(b)):
1504 raw = self.BytesIO(b)
1505 bufio = self.tp(raw, 100)
1506 mutate(bufio, i, j)
1507 bufio.flush()
1508 expected = bytearray(b)
1509 expected[j] = 2
1510 expected[i] = 1
1511 self.assertEqual(raw.getvalue(), expected,
1512 "failed result for i=%d, j=%d" % (i, j))
1513
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001514 def test_truncate_after_read_or_write(self):
1515 raw = self.BytesIO(b"A" * 10)
1516 bufio = self.tp(raw, 100)
1517 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1518 self.assertEqual(bufio.truncate(), 2)
1519 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1520 self.assertEqual(bufio.truncate(), 4)
1521
Antoine Pitrou19690592009-06-12 20:14:08 +00001522 def test_misbehaved_io(self):
1523 BufferedReaderTest.test_misbehaved_io(self)
1524 BufferedWriterTest.test_misbehaved_io(self)
1525
Antoine Pitrou808cec52011-08-20 15:40:58 +02001526 def test_interleaved_read_write(self):
1527 # Test for issue #12213
1528 with self.BytesIO(b'abcdefgh') as raw:
1529 with self.tp(raw, 100) as f:
1530 f.write(b"1")
1531 self.assertEqual(f.read(1), b'b')
1532 f.write(b'2')
1533 self.assertEqual(f.read1(1), b'd')
1534 f.write(b'3')
1535 buf = bytearray(1)
1536 f.readinto(buf)
1537 self.assertEqual(buf, b'f')
1538 f.write(b'4')
1539 self.assertEqual(f.peek(1), b'h')
1540 f.flush()
1541 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1542
1543 with self.BytesIO(b'abc') as raw:
1544 with self.tp(raw, 100) as f:
1545 self.assertEqual(f.read(1), b'a')
1546 f.write(b"2")
1547 self.assertEqual(f.read(1), b'c')
1548 f.flush()
1549 self.assertEqual(raw.getvalue(), b'a2c')
1550
1551 def test_interleaved_readline_write(self):
1552 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1553 with self.tp(raw) as f:
1554 f.write(b'1')
1555 self.assertEqual(f.readline(), b'b\n')
1556 f.write(b'2')
1557 self.assertEqual(f.readline(), b'def\n')
1558 f.write(b'3')
1559 self.assertEqual(f.readline(), b'\n')
1560 f.flush()
1561 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1562
1563
Antoine Pitrou19690592009-06-12 20:14:08 +00001564class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest):
1565 tp = io.BufferedRandom
1566
1567 def test_constructor(self):
1568 BufferedRandomTest.test_constructor(self)
1569 # The allocation can succeed on 32-bit builds, e.g. with more
1570 # than 2GB RAM and a 64-bit kernel.
1571 if sys.maxsize > 0x7FFFFFFF:
1572 rawio = self.MockRawIO()
1573 bufio = self.tp(rawio)
1574 self.assertRaises((OverflowError, MemoryError, ValueError),
1575 bufio.__init__, rawio, sys.maxsize)
1576
1577 def test_garbage_collection(self):
1578 CBufferedReaderTest.test_garbage_collection(self)
1579 CBufferedWriterTest.test_garbage_collection(self)
1580
1581class PyBufferedRandomTest(BufferedRandomTest):
1582 tp = pyio.BufferedRandom
1583
1584
Christian Heimes1a6387e2008-03-26 12:49:49 +00001585# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1586# properties:
1587# - A single output character can correspond to many bytes of input.
1588# - The number of input bytes to complete the character can be
1589# undetermined until the last input byte is received.
1590# - The number of input bytes can vary depending on previous input.
1591# - A single input byte can correspond to many characters of output.
1592# - The number of output characters can be undetermined until the
1593# last input byte is received.
1594# - The number of output characters can vary depending on previous input.
1595
1596class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1597 """
1598 For testing seek/tell behavior with a stateful, buffering decoder.
1599
1600 Input is a sequence of words. Words may be fixed-length (length set
1601 by input) or variable-length (period-terminated). In variable-length
1602 mode, extra periods are ignored. Possible words are:
1603 - 'i' followed by a number sets the input length, I (maximum 99).
1604 When I is set to 0, words are space-terminated.
1605 - 'o' followed by a number sets the output length, O (maximum 99).
1606 - Any other word is converted into a word followed by a period on
1607 the output. The output word consists of the input word truncated
1608 or padded out with hyphens to make its length equal to O. If O
1609 is 0, the word is output verbatim without truncating or padding.
1610 I and O are initially set to 1. When I changes, any buffered input is
1611 re-scanned according to the new I. EOF also terminates the last word.
1612 """
1613
1614 def __init__(self, errors='strict'):
1615 codecs.IncrementalDecoder.__init__(self, errors)
1616 self.reset()
1617
1618 def __repr__(self):
1619 return '<SID %x>' % id(self)
1620
1621 def reset(self):
1622 self.i = 1
1623 self.o = 1
1624 self.buffer = bytearray()
1625
1626 def getstate(self):
1627 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1628 return bytes(self.buffer), i*100 + o
1629
1630 def setstate(self, state):
1631 buffer, io = state
1632 self.buffer = bytearray(buffer)
1633 i, o = divmod(io, 100)
1634 self.i, self.o = i ^ 1, o ^ 1
1635
1636 def decode(self, input, final=False):
1637 output = ''
1638 for b in input:
1639 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001640 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001641 if self.buffer:
1642 output += self.process_word()
1643 else:
1644 self.buffer.append(b)
1645 else: # fixed-length, terminate after self.i bytes
1646 self.buffer.append(b)
1647 if len(self.buffer) == self.i:
1648 output += self.process_word()
1649 if final and self.buffer: # EOF terminates the last word
1650 output += self.process_word()
1651 return output
1652
1653 def process_word(self):
1654 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001655 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001656 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001657 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001658 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1659 else:
1660 output = self.buffer.decode('ascii')
1661 if len(output) < self.o:
1662 output += '-'*self.o # pad out with hyphens
1663 if self.o:
1664 output = output[:self.o] # truncate to output length
1665 output += '.'
1666 self.buffer = bytearray()
1667 return output
1668
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001669 codecEnabled = False
1670
1671 @classmethod
1672 def lookupTestDecoder(cls, name):
1673 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001674 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001675 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001676 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001677 incrementalencoder=None,
1678 streamreader=None, streamwriter=None,
1679 incrementaldecoder=cls)
1680
1681# Register the previous decoder for testing.
1682# Disabled by default, tests will enable it.
1683codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1684
1685
Christian Heimes1a6387e2008-03-26 12:49:49 +00001686class StatefulIncrementalDecoderTest(unittest.TestCase):
1687 """
1688 Make sure the StatefulIncrementalDecoder actually works.
1689 """
1690
1691 test_cases = [
1692 # I=1, O=1 (fixed-length input == fixed-length output)
1693 (b'abcd', False, 'a.b.c.d.'),
1694 # I=0, O=0 (variable-length input, variable-length output)
1695 (b'oiabcd', True, 'abcd.'),
1696 # I=0, O=0 (should ignore extra periods)
1697 (b'oi...abcd...', True, 'abcd.'),
1698 # I=0, O=6 (variable-length input, fixed-length output)
1699 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1700 # I=2, O=6 (fixed-length input < fixed-length output)
1701 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1702 # I=6, O=3 (fixed-length input > fixed-length output)
1703 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1704 # I=0, then 3; O=29, then 15 (with longer output)
1705 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1706 'a----------------------------.' +
1707 'b----------------------------.' +
1708 'cde--------------------------.' +
1709 'abcdefghijabcde.' +
1710 'a.b------------.' +
1711 '.c.------------.' +
1712 'd.e------------.' +
1713 'k--------------.' +
1714 'l--------------.' +
1715 'm--------------.')
1716 ]
1717
Antoine Pitrou19690592009-06-12 20:14:08 +00001718 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001719 # Try a few one-shot test cases.
1720 for input, eof, output in self.test_cases:
1721 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001722 self.assertEqual(d.decode(input, eof), output)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001723
1724 # Also test an unfinished decode, followed by forcing EOF.
1725 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001726 self.assertEqual(d.decode(b'oiabcd'), '')
1727 self.assertEqual(d.decode(b'', 1), 'abcd.')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001728
1729class TextIOWrapperTest(unittest.TestCase):
1730
1731 def setUp(self):
1732 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1733 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001734 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001735
1736 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001737 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001738
Antoine Pitrou19690592009-06-12 20:14:08 +00001739 def test_constructor(self):
1740 r = self.BytesIO(b"\xc3\xa9\n\n")
1741 b = self.BufferedReader(r, 1000)
1742 t = self.TextIOWrapper(b)
1743 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti2623a372010-11-21 13:34:58 +00001744 self.assertEqual(t.encoding, "latin1")
1745 self.assertEqual(t.line_buffering, False)
Antoine Pitrou19690592009-06-12 20:14:08 +00001746 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti2623a372010-11-21 13:34:58 +00001747 self.assertEqual(t.encoding, "utf8")
1748 self.assertEqual(t.line_buffering, True)
1749 self.assertEqual("\xe9\n", t.readline())
Antoine Pitrou19690592009-06-12 20:14:08 +00001750 self.assertRaises(TypeError, t.__init__, b, newline=42)
1751 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1752
1753 def test_detach(self):
1754 r = self.BytesIO()
1755 b = self.BufferedWriter(r)
1756 t = self.TextIOWrapper(b)
1757 self.assertIs(t.detach(), b)
1758
1759 t = self.TextIOWrapper(b, encoding="ascii")
1760 t.write("howdy")
1761 self.assertFalse(r.getvalue())
1762 t.detach()
1763 self.assertEqual(r.getvalue(), b"howdy")
1764 self.assertRaises(ValueError, t.detach)
1765
1766 def test_repr(self):
1767 raw = self.BytesIO("hello".encode("utf-8"))
1768 b = self.BufferedReader(raw)
1769 t = self.TextIOWrapper(b, encoding="utf-8")
1770 modname = self.TextIOWrapper.__module__
1771 self.assertEqual(repr(t),
1772 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1773 raw.name = "dummy"
1774 self.assertEqual(repr(t),
1775 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1776 raw.name = b"dummy"
1777 self.assertEqual(repr(t),
1778 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1779
1780 def test_line_buffering(self):
1781 r = self.BytesIO()
1782 b = self.BufferedWriter(r, 1000)
1783 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1784 t.write("X")
Ezio Melotti2623a372010-11-21 13:34:58 +00001785 self.assertEqual(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001786 t.write("Y\nZ")
Ezio Melotti2623a372010-11-21 13:34:58 +00001787 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001788 t.write("A\rB")
Ezio Melotti2623a372010-11-21 13:34:58 +00001789 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001790
Antoine Pitrou19690592009-06-12 20:14:08 +00001791 def test_encoding(self):
1792 # Check the encoding attribute is always set, and valid
1793 b = self.BytesIO()
1794 t = self.TextIOWrapper(b, encoding="utf8")
1795 self.assertEqual(t.encoding, "utf8")
1796 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001797 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00001798 codecs.lookup(t.encoding)
1799
1800 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001801 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001802 b = self.BytesIO(b"abc\n\xff\n")
1803 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001804 self.assertRaises(UnicodeError, t.read)
1805 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001806 b = self.BytesIO(b"abc\n\xff\n")
1807 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001808 self.assertRaises(UnicodeError, t.read)
1809 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001810 b = self.BytesIO(b"abc\n\xff\n")
1811 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti2623a372010-11-21 13:34:58 +00001812 self.assertEqual(t.read(), "abc\n\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001813 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001814 b = self.BytesIO(b"abc\n\xff\n")
1815 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti2623a372010-11-21 13:34:58 +00001816 self.assertEqual(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001817
Antoine Pitrou19690592009-06-12 20:14:08 +00001818 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001819 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001820 b = self.BytesIO()
1821 t = self.TextIOWrapper(b, encoding="ascii")
1822 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001823 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001824 b = self.BytesIO()
1825 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1826 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001827 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001828 b = self.BytesIO()
1829 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001830 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001831 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001832 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001833 self.assertEqual(b.getvalue(), b"abcdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001834 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001835 b = self.BytesIO()
1836 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001837 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001838 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001839 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001840 self.assertEqual(b.getvalue(), b"abc?def\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001841
Antoine Pitrou19690592009-06-12 20:14:08 +00001842 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001843 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1844
1845 tests = [
1846 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1847 [ '', input_lines ],
1848 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1849 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1850 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1851 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001852 encodings = (
1853 'utf-8', 'latin-1',
1854 'utf-16', 'utf-16-le', 'utf-16-be',
1855 'utf-32', 'utf-32-le', 'utf-32-be',
1856 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00001857
1858 # Try a range of buffer sizes to test the case where \r is the last
1859 # character in TextIOWrapper._pending_line.
1860 for encoding in encodings:
1861 # XXX: str.encode() should return bytes
1862 data = bytes(''.join(input_lines).encode(encoding))
1863 for do_reads in (False, True):
1864 for bufsize in range(1, 10):
1865 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001866 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1867 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00001868 encoding=encoding)
1869 if do_reads:
1870 got_lines = []
1871 while True:
1872 c2 = textio.read(2)
1873 if c2 == '':
1874 break
Ezio Melotti2623a372010-11-21 13:34:58 +00001875 self.assertEqual(len(c2), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001876 got_lines.append(c2 + textio.readline())
1877 else:
1878 got_lines = list(textio)
1879
1880 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti2623a372010-11-21 13:34:58 +00001881 self.assertEqual(got_line, exp_line)
1882 self.assertEqual(len(got_lines), len(exp_lines))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001883
Antoine Pitrou19690592009-06-12 20:14:08 +00001884 def test_newlines_input(self):
1885 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001886 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1887 for newline, expected in [
1888 (None, normalized.decode("ascii").splitlines(True)),
1889 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00001890 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1891 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1892 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00001893 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00001894 buf = self.BytesIO(testdata)
1895 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti2623a372010-11-21 13:34:58 +00001896 self.assertEqual(txt.readlines(), expected)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001897 txt.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001898 self.assertEqual(txt.read(), "".join(expected))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001899
Antoine Pitrou19690592009-06-12 20:14:08 +00001900 def test_newlines_output(self):
1901 testdict = {
1902 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1903 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1904 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1905 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1906 }
1907 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1908 for newline, expected in tests:
1909 buf = self.BytesIO()
1910 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1911 txt.write("AAA\nB")
1912 txt.write("BB\nCCC\n")
1913 txt.write("X\rY\r\nZ")
1914 txt.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001915 self.assertEqual(buf.closed, False)
1916 self.assertEqual(buf.getvalue(), expected)
Antoine Pitrou19690592009-06-12 20:14:08 +00001917
1918 def test_destructor(self):
1919 l = []
1920 base = self.BytesIO
1921 class MyBytesIO(base):
1922 def close(self):
1923 l.append(self.getvalue())
1924 base.close(self)
1925 b = MyBytesIO()
1926 t = self.TextIOWrapper(b, encoding="ascii")
1927 t.write("abc")
1928 del t
1929 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001930 self.assertEqual([b"abc"], l)
Antoine Pitrou19690592009-06-12 20:14:08 +00001931
1932 def test_override_destructor(self):
1933 record = []
1934 class MyTextIO(self.TextIOWrapper):
1935 def __del__(self):
1936 record.append(1)
1937 try:
1938 f = super(MyTextIO, self).__del__
1939 except AttributeError:
1940 pass
1941 else:
1942 f()
1943 def close(self):
1944 record.append(2)
1945 super(MyTextIO, self).close()
1946 def flush(self):
1947 record.append(3)
1948 super(MyTextIO, self).flush()
1949 b = self.BytesIO()
1950 t = MyTextIO(b, encoding="ascii")
1951 del t
1952 support.gc_collect()
1953 self.assertEqual(record, [1, 2, 3])
1954
1955 def test_error_through_destructor(self):
1956 # Test that the exception state is not modified by a destructor,
1957 # even if close() fails.
1958 rawio = self.CloseFailureIO()
1959 def f():
1960 self.TextIOWrapper(rawio).xyzzy
1961 with support.captured_output("stderr") as s:
1962 self.assertRaises(AttributeError, f)
1963 s = s.getvalue().strip()
1964 if s:
1965 # The destructor *may* have printed an unraisable error, check it
1966 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001967 self.assertTrue(s.startswith("Exception IOError: "), s)
1968 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001969
1970 # Systematic tests of the text I/O API
1971
Antoine Pitrou19690592009-06-12 20:14:08 +00001972 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001973 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1974 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00001975 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001976 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00001977 self.assertEqual(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001978 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001979 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001980 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00001981 self.assertEqual(f.tell(), 0)
1982 self.assertEqual(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001983 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00001984 self.assertEqual(f.seek(0), 0)
1985 self.assertEqual(f.read(None), "abc")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001986 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001987 self.assertEqual(f.read(2), "ab")
1988 self.assertEqual(f.read(1), "c")
1989 self.assertEqual(f.read(1), "")
1990 self.assertEqual(f.read(), "")
1991 self.assertEqual(f.tell(), cookie)
1992 self.assertEqual(f.seek(0), 0)
1993 self.assertEqual(f.seek(0, 2), cookie)
1994 self.assertEqual(f.write("def"), 3)
1995 self.assertEqual(f.seek(cookie), cookie)
1996 self.assertEqual(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001997 if enc.startswith("utf"):
1998 self.multi_line_test(f, enc)
1999 f.close()
2000
2001 def multi_line_test(self, f, enc):
2002 f.seek(0)
2003 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00002004 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002005 wlines = []
2006 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2007 chars = []
2008 for i in range(size):
2009 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00002010 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002011 wlines.append((f.tell(), line))
2012 f.write(line)
2013 f.seek(0)
2014 rlines = []
2015 while True:
2016 pos = f.tell()
2017 line = f.readline()
2018 if not line:
2019 break
2020 rlines.append((pos, line))
Ezio Melotti2623a372010-11-21 13:34:58 +00002021 self.assertEqual(rlines, wlines)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002022
Antoine Pitrou19690592009-06-12 20:14:08 +00002023 def test_telling(self):
2024 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002025 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002026 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002027 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002028 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002029 p2 = f.tell()
2030 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002031 self.assertEqual(f.tell(), p0)
2032 self.assertEqual(f.readline(), "\xff\n")
2033 self.assertEqual(f.tell(), p1)
2034 self.assertEqual(f.readline(), "\xff\n")
2035 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002036 f.seek(0)
2037 for line in f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002038 self.assertEqual(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002039 self.assertRaises(IOError, f.tell)
Ezio Melotti2623a372010-11-21 13:34:58 +00002040 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002041 f.close()
2042
Antoine Pitrou19690592009-06-12 20:14:08 +00002043 def test_seeking(self):
2044 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00002045 prefix_size = chunk_size - 2
2046 u_prefix = "a" * prefix_size
2047 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti2623a372010-11-21 13:34:58 +00002048 self.assertEqual(len(u_prefix), len(prefix))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002049 u_suffix = "\u8888\n"
2050 suffix = bytes(u_suffix.encode("utf-8"))
2051 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00002052 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002053 f.write(line*2)
2054 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002055 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002056 s = f.read(prefix_size)
Ezio Melotti2623a372010-11-21 13:34:58 +00002057 self.assertEqual(s, prefix.decode("ascii"))
2058 self.assertEqual(f.tell(), prefix_size)
2059 self.assertEqual(f.readline(), u_suffix)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002060
Antoine Pitrou19690592009-06-12 20:14:08 +00002061 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002062 # Regression test for a specific bug
2063 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00002064 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002065 f.write(data)
2066 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002067 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002068 f._CHUNK_SIZE # Just test that it exists
2069 f._CHUNK_SIZE = 2
2070 f.readline()
2071 f.tell()
2072
Antoine Pitrou19690592009-06-12 20:14:08 +00002073 def test_seek_and_tell(self):
2074 #Test seek/tell using the StatefulIncrementalDecoder.
2075 # Make test faster by doing smaller seeks
2076 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00002077
Antoine Pitrou19690592009-06-12 20:14:08 +00002078 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002079 """Tell/seek to various points within a data stream and ensure
2080 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00002081 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002082 f.write(data)
2083 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002084 f = self.open(support.TESTFN, encoding='test_decoder')
2085 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00002086 decoded = f.read()
2087 f.close()
2088
2089 for i in range(min_pos, len(decoded) + 1): # seek positions
2090 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00002091 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti2623a372010-11-21 13:34:58 +00002092 self.assertEqual(f.read(i), decoded[:i])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002093 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002094 self.assertEqual(f.read(j), decoded[i:i + j])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002095 f.seek(cookie)
Ezio Melotti2623a372010-11-21 13:34:58 +00002096 self.assertEqual(f.read(), decoded[i:])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002097 f.close()
2098
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002099 # Enable the test decoder.
2100 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002101
2102 # Run the tests.
2103 try:
2104 # Try each test case.
2105 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002106 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002107
2108 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002109 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2110 offset = CHUNK_SIZE - len(input)//2
2111 prefix = b'.'*offset
2112 # Don't bother seeking into the prefix (takes too long).
2113 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002114 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002115
2116 # Ensure our test decoder won't interfere with subsequent tests.
2117 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002118 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002119
Antoine Pitrou19690592009-06-12 20:14:08 +00002120 def test_encoded_writes(self):
2121 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002122 tests = ("utf-16",
2123 "utf-16-le",
2124 "utf-16-be",
2125 "utf-32",
2126 "utf-32-le",
2127 "utf-32-be")
2128 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002129 buf = self.BytesIO()
2130 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002131 # Check if the BOM is written only once (see issue1753).
2132 f.write(data)
2133 f.write(data)
2134 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002135 self.assertEqual(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002136 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002137 self.assertEqual(f.read(), data * 2)
2138 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002139
Antoine Pitrou19690592009-06-12 20:14:08 +00002140 def test_unreadable(self):
2141 class UnReadable(self.BytesIO):
2142 def readable(self):
2143 return False
2144 txt = self.TextIOWrapper(UnReadable())
2145 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002146
Antoine Pitrou19690592009-06-12 20:14:08 +00002147 def test_read_one_by_one(self):
2148 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002149 reads = ""
2150 while True:
2151 c = txt.read(1)
2152 if not c:
2153 break
2154 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002155 self.assertEqual(reads, "AA\nBB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002156
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002157 def test_readlines(self):
2158 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2159 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2160 txt.seek(0)
2161 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2162 txt.seek(0)
2163 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2164
Christian Heimes1a6387e2008-03-26 12:49:49 +00002165 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002166 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002167 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002168 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002169 reads = ""
2170 while True:
2171 c = txt.read(128)
2172 if not c:
2173 break
2174 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002175 self.assertEqual(reads, "A"*127+"\nB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002176
2177 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002178 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002179
2180 # read one char at a time
2181 reads = ""
2182 while True:
2183 c = txt.read(1)
2184 if not c:
2185 break
2186 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002187 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002188
2189 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002190 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002191 txt._CHUNK_SIZE = 4
2192
2193 reads = ""
2194 while True:
2195 c = txt.read(4)
2196 if not c:
2197 break
2198 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002199 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002200
2201 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002202 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002203 txt._CHUNK_SIZE = 4
2204
2205 reads = txt.read(4)
2206 reads += txt.read(4)
2207 reads += txt.readline()
2208 reads += txt.readline()
2209 reads += txt.readline()
Ezio Melotti2623a372010-11-21 13:34:58 +00002210 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002211
2212 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002213 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002214 txt._CHUNK_SIZE = 4
2215
2216 reads = txt.read(4)
2217 reads += txt.read()
Ezio Melotti2623a372010-11-21 13:34:58 +00002218 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002219
2220 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002221 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002222 txt._CHUNK_SIZE = 4
2223
2224 reads = txt.read(4)
2225 pos = txt.tell()
2226 txt.seek(0)
2227 txt.seek(pos)
Ezio Melotti2623a372010-11-21 13:34:58 +00002228 self.assertEqual(txt.read(4), "BBB\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002229
2230 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002231 buffer = self.BytesIO(self.testdata)
2232 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002233
2234 self.assertEqual(buffer.seekable(), txt.seekable())
2235
Antoine Pitrou19690592009-06-12 20:14:08 +00002236 def test_append_bom(self):
2237 # The BOM is not written again when appending to a non-empty file
2238 filename = support.TESTFN
2239 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2240 with self.open(filename, 'w', encoding=charset) as f:
2241 f.write('aaa')
2242 pos = f.tell()
2243 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002244 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002245
2246 with self.open(filename, 'a', encoding=charset) as f:
2247 f.write('xxx')
2248 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002249 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002250
Antoine Pitrou19690592009-06-12 20:14:08 +00002251 def test_seek_bom(self):
2252 # Same test, but when seeking manually
2253 filename = support.TESTFN
2254 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2255 with self.open(filename, 'w', encoding=charset) as f:
2256 f.write('aaa')
2257 pos = f.tell()
2258 with self.open(filename, 'r+', encoding=charset) as f:
2259 f.seek(pos)
2260 f.write('zzz')
2261 f.seek(0)
2262 f.write('bbb')
2263 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002264 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002265
2266 def test_errors_property(self):
2267 with self.open(support.TESTFN, "w") as f:
2268 self.assertEqual(f.errors, "strict")
2269 with self.open(support.TESTFN, "w", errors="replace") as f:
2270 self.assertEqual(f.errors, "replace")
2271
Victor Stinner6a102812010-04-27 23:55:59 +00002272 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002273 def test_threads_write(self):
2274 # Issue6750: concurrent writes could duplicate data
2275 event = threading.Event()
2276 with self.open(support.TESTFN, "w", buffering=1) as f:
2277 def run(n):
2278 text = "Thread%03d\n" % n
2279 event.wait()
2280 f.write(text)
2281 threads = [threading.Thread(target=lambda n=x: run(n))
2282 for x in range(20)]
2283 for t in threads:
2284 t.start()
2285 time.sleep(0.02)
2286 event.set()
2287 for t in threads:
2288 t.join()
2289 with self.open(support.TESTFN) as f:
2290 content = f.read()
2291 for n in range(20):
Ezio Melotti2623a372010-11-21 13:34:58 +00002292 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002293
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002294 def test_flush_error_on_close(self):
2295 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2296 def bad_flush():
2297 raise IOError()
2298 txt.flush = bad_flush
2299 self.assertRaises(IOError, txt.close) # exception not swallowed
2300
2301 def test_multi_close(self):
2302 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2303 txt.close()
2304 txt.close()
2305 txt.close()
2306 self.assertRaises(ValueError, txt.flush)
2307
Antoine Pitroufc9ead62010-12-21 21:26:55 +00002308 def test_readonly_attributes(self):
2309 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2310 buf = self.BytesIO(self.testdata)
2311 with self.assertRaises((AttributeError, TypeError)):
2312 txt.buffer = buf
2313
Antoine Pitrou19690592009-06-12 20:14:08 +00002314class CTextIOWrapperTest(TextIOWrapperTest):
2315
2316 def test_initialization(self):
2317 r = self.BytesIO(b"\xc3\xa9\n\n")
2318 b = self.BufferedReader(r, 1000)
2319 t = self.TextIOWrapper(b)
2320 self.assertRaises(TypeError, t.__init__, b, newline=42)
2321 self.assertRaises(ValueError, t.read)
2322 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2323 self.assertRaises(ValueError, t.read)
2324
2325 def test_garbage_collection(self):
2326 # C TextIOWrapper objects are collected, and collecting them flushes
2327 # all data to disk.
2328 # The Python version has __del__, so it ends in gc.garbage instead.
2329 rawio = io.FileIO(support.TESTFN, "wb")
2330 b = self.BufferedWriter(rawio)
2331 t = self.TextIOWrapper(b, encoding="ascii")
2332 t.write("456def")
2333 t.x = t
2334 wr = weakref.ref(t)
2335 del t
2336 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002337 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002338 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002339 self.assertEqual(f.read(), b"456def")
2340
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002341 def test_rwpair_cleared_before_textio(self):
2342 # Issue 13070: TextIOWrapper's finalization would crash when called
2343 # after the reference to the underlying BufferedRWPair's writer got
2344 # cleared by the GC.
2345 for i in range(1000):
2346 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2347 t1 = self.TextIOWrapper(b1, encoding="ascii")
2348 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2349 t2 = self.TextIOWrapper(b2, encoding="ascii")
2350 # circular references
2351 t1.buddy = t2
2352 t2.buddy = t1
2353 support.gc_collect()
2354
2355
Antoine Pitrou19690592009-06-12 20:14:08 +00002356class PyTextIOWrapperTest(TextIOWrapperTest):
2357 pass
2358
2359
2360class IncrementalNewlineDecoderTest(unittest.TestCase):
2361
2362 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002363 # UTF-8 specific tests for a newline decoder
2364 def _check_decode(b, s, **kwargs):
2365 # We exercise getstate() / setstate() as well as decode()
2366 state = decoder.getstate()
Ezio Melotti2623a372010-11-21 13:34:58 +00002367 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002368 decoder.setstate(state)
Ezio Melotti2623a372010-11-21 13:34:58 +00002369 self.assertEqual(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002370
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002371 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002372
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002373 _check_decode(b'\xe8', "")
2374 _check_decode(b'\xa2', "")
2375 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002376
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002377 _check_decode(b'\xe8', "")
2378 _check_decode(b'\xa2', "")
2379 _check_decode(b'\x88', "\u8888")
2380
2381 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002382 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2383
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002384 decoder.reset()
2385 _check_decode(b'\n', "\n")
2386 _check_decode(b'\r', "")
2387 _check_decode(b'', "\n", final=True)
2388 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002389
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002390 _check_decode(b'\r', "")
2391 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002392
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002393 _check_decode(b'\r\r\n', "\n\n")
2394 _check_decode(b'\r', "")
2395 _check_decode(b'\r', "\n")
2396 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002397
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002398 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2399 _check_decode(b'\xe8\xa2\x88', "\u8888")
2400 _check_decode(b'\n', "\n")
2401 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2402 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002403
Antoine Pitrou19690592009-06-12 20:14:08 +00002404 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002405 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002406 if encoding is not None:
2407 encoder = codecs.getincrementalencoder(encoding)()
2408 def _decode_bytewise(s):
2409 # Decode one byte at a time
2410 for b in encoder.encode(s):
2411 result.append(decoder.decode(b))
2412 else:
2413 encoder = None
2414 def _decode_bytewise(s):
2415 # Decode one char at a time
2416 for c in s:
2417 result.append(decoder.decode(c))
Ezio Melotti2623a372010-11-21 13:34:58 +00002418 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002419 _decode_bytewise("abc\n\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002420 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002421 _decode_bytewise("\nabc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002422 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002423 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002424 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002425 _decode_bytewise("abc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002426 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002427 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002428 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002429 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002430 input = "abc"
2431 if encoder is not None:
2432 encoder.reset()
2433 input = encoder.encode(input)
Ezio Melotti2623a372010-11-21 13:34:58 +00002434 self.assertEqual(decoder.decode(input), "abc")
2435 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002436
2437 def test_newline_decoder(self):
2438 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002439 # None meaning the IncrementalNewlineDecoder takes unicode input
2440 # rather than bytes input
2441 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002442 'utf-16', 'utf-16-le', 'utf-16-be',
2443 'utf-32', 'utf-32-le', 'utf-32-be',
2444 )
2445 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002446 decoder = enc and codecs.getincrementaldecoder(enc)()
2447 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2448 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002449 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002450 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2451 self.check_newline_decoding_utf8(decoder)
2452
2453 def test_newline_bytes(self):
2454 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2455 def _check(dec):
Ezio Melotti2623a372010-11-21 13:34:58 +00002456 self.assertEqual(dec.newlines, None)
2457 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2458 self.assertEqual(dec.newlines, None)
2459 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2460 self.assertEqual(dec.newlines, None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002461 dec = self.IncrementalNewlineDecoder(None, translate=False)
2462 _check(dec)
2463 dec = self.IncrementalNewlineDecoder(None, translate=True)
2464 _check(dec)
2465
2466class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2467 pass
2468
2469class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2470 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002471
Christian Heimes1a6387e2008-03-26 12:49:49 +00002472
2473# XXX Tests for open()
2474
2475class MiscIOTest(unittest.TestCase):
2476
Benjamin Petersonad100c32008-11-20 22:06:22 +00002477 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002478 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002479
Antoine Pitrou19690592009-06-12 20:14:08 +00002480 def test___all__(self):
2481 for name in self.io.__all__:
2482 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002483 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002484 if name == "open":
2485 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002486 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002487 self.assertTrue(issubclass(obj, Exception), name)
2488 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002489 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002490
Benjamin Petersonad100c32008-11-20 22:06:22 +00002491 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002492 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002493 self.assertEqual(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002494 f.close()
2495
Antoine Pitrou19690592009-06-12 20:14:08 +00002496 f = self.open(support.TESTFN, "U")
Ezio Melotti2623a372010-11-21 13:34:58 +00002497 self.assertEqual(f.name, support.TESTFN)
2498 self.assertEqual(f.buffer.name, support.TESTFN)
2499 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2500 self.assertEqual(f.mode, "U")
2501 self.assertEqual(f.buffer.mode, "rb")
2502 self.assertEqual(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002503 f.close()
2504
Antoine Pitrou19690592009-06-12 20:14:08 +00002505 f = self.open(support.TESTFN, "w+")
Ezio Melotti2623a372010-11-21 13:34:58 +00002506 self.assertEqual(f.mode, "w+")
2507 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2508 self.assertEqual(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002509
Antoine Pitrou19690592009-06-12 20:14:08 +00002510 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti2623a372010-11-21 13:34:58 +00002511 self.assertEqual(g.mode, "wb")
2512 self.assertEqual(g.raw.mode, "wb")
2513 self.assertEqual(g.name, f.fileno())
2514 self.assertEqual(g.raw.name, f.fileno())
Benjamin Petersonad100c32008-11-20 22:06:22 +00002515 f.close()
2516 g.close()
2517
Antoine Pitrou19690592009-06-12 20:14:08 +00002518 def test_io_after_close(self):
2519 for kwargs in [
2520 {"mode": "w"},
2521 {"mode": "wb"},
2522 {"mode": "w", "buffering": 1},
2523 {"mode": "w", "buffering": 2},
2524 {"mode": "wb", "buffering": 0},
2525 {"mode": "r"},
2526 {"mode": "rb"},
2527 {"mode": "r", "buffering": 1},
2528 {"mode": "r", "buffering": 2},
2529 {"mode": "rb", "buffering": 0},
2530 {"mode": "w+"},
2531 {"mode": "w+b"},
2532 {"mode": "w+", "buffering": 1},
2533 {"mode": "w+", "buffering": 2},
2534 {"mode": "w+b", "buffering": 0},
2535 ]:
2536 f = self.open(support.TESTFN, **kwargs)
2537 f.close()
2538 self.assertRaises(ValueError, f.flush)
2539 self.assertRaises(ValueError, f.fileno)
2540 self.assertRaises(ValueError, f.isatty)
2541 self.assertRaises(ValueError, f.__iter__)
2542 if hasattr(f, "peek"):
2543 self.assertRaises(ValueError, f.peek, 1)
2544 self.assertRaises(ValueError, f.read)
2545 if hasattr(f, "read1"):
2546 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinner5100a402011-05-25 22:15:36 +02002547 if hasattr(f, "readall"):
2548 self.assertRaises(ValueError, f.readall)
Antoine Pitrou19690592009-06-12 20:14:08 +00002549 if hasattr(f, "readinto"):
2550 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2551 self.assertRaises(ValueError, f.readline)
2552 self.assertRaises(ValueError, f.readlines)
2553 self.assertRaises(ValueError, f.seek, 0)
2554 self.assertRaises(ValueError, f.tell)
2555 self.assertRaises(ValueError, f.truncate)
2556 self.assertRaises(ValueError, f.write,
2557 b"" if "b" in kwargs['mode'] else "")
2558 self.assertRaises(ValueError, f.writelines, [])
2559 self.assertRaises(ValueError, next, f)
2560
2561 def test_blockingioerror(self):
2562 # Various BlockingIOError issues
2563 self.assertRaises(TypeError, self.BlockingIOError)
2564 self.assertRaises(TypeError, self.BlockingIOError, 1)
2565 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2566 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2567 b = self.BlockingIOError(1, "")
2568 self.assertEqual(b.characters_written, 0)
2569 class C(unicode):
2570 pass
2571 c = C("")
2572 b = self.BlockingIOError(1, c)
2573 c.b = b
2574 b.c = c
2575 wr = weakref.ref(c)
2576 del c, b
2577 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002578 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002579
2580 def test_abcs(self):
2581 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002582 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2583 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2584 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2585 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002586
2587 def _check_abc_inheritance(self, abcmodule):
2588 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002589 self.assertIsInstance(f, abcmodule.IOBase)
2590 self.assertIsInstance(f, abcmodule.RawIOBase)
2591 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2592 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002593 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002594 self.assertIsInstance(f, abcmodule.IOBase)
2595 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2596 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2597 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002598 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002599 self.assertIsInstance(f, abcmodule.IOBase)
2600 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2601 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2602 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002603
2604 def test_abc_inheritance(self):
2605 # Test implementations inherit from their respective ABCs
2606 self._check_abc_inheritance(self)
2607
2608 def test_abc_inheritance_official(self):
2609 # Test implementations inherit from the official ABCs of the
2610 # baseline "io" module.
2611 self._check_abc_inheritance(io)
2612
2613class CMiscIOTest(MiscIOTest):
2614 io = io
2615
2616class PyMiscIOTest(MiscIOTest):
2617 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002618
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002619
2620@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2621class SignalsTest(unittest.TestCase):
2622
2623 def setUp(self):
2624 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2625
2626 def tearDown(self):
2627 signal.signal(signal.SIGALRM, self.oldalrm)
2628
2629 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00002630 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002631
2632 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinner49d495f2011-07-04 11:44:46 +02002633 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
2634 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002635 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2636 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou6439c002011-02-25 21:35:47 +00002637 invokes the signal handler, and bubbles up the exception raised
2638 in the latter."""
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002639 read_results = []
2640 def _read():
2641 s = os.read(r, 1)
2642 read_results.append(s)
2643 t = threading.Thread(target=_read)
2644 t.daemon = True
2645 r, w = os.pipe()
2646 try:
2647 wio = self.io.open(w, **fdopen_kwargs)
2648 t.start()
2649 signal.alarm(1)
2650 # Fill the pipe enough that the write will be blocking.
2651 # It will be interrupted by the timer armed above. Since the
2652 # other thread has read one byte, the low-level write will
2653 # return with a successful (partial) result rather than an EINTR.
2654 # The buffered IO layer must check for pending signal
2655 # handlers, which in this case will invoke alarm_interrupt().
2656 self.assertRaises(ZeroDivisionError,
2657 wio.write, item * (1024 * 1024))
2658 t.join()
2659 # We got one byte, get another one and check that it isn't a
2660 # repeat of the first one.
2661 read_results.append(os.read(r, 1))
2662 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2663 finally:
2664 os.close(w)
2665 os.close(r)
2666 # This is deliberate. If we didn't close the file descriptor
2667 # before closing wio, wio would try to flush its internal
2668 # buffer, and block again.
2669 try:
2670 wio.close()
2671 except IOError as e:
2672 if e.errno != errno.EBADF:
2673 raise
2674
2675 def test_interrupted_write_unbuffered(self):
2676 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2677
2678 def test_interrupted_write_buffered(self):
2679 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2680
2681 def test_interrupted_write_text(self):
2682 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2683
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002684 def check_reentrant_write(self, data, **fdopen_kwargs):
2685 def on_alarm(*args):
2686 # Will be called reentrantly from the same thread
2687 wio.write(data)
Victor Stinner4c41f842011-07-05 13:29:26 +02002688 1//0
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002689 signal.signal(signal.SIGALRM, on_alarm)
2690 r, w = os.pipe()
2691 wio = self.io.open(w, **fdopen_kwargs)
2692 try:
2693 signal.alarm(1)
2694 # Either the reentrant call to wio.write() fails with RuntimeError,
2695 # or the signal handler raises ZeroDivisionError.
2696 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2697 while 1:
2698 for i in range(100):
2699 wio.write(data)
2700 wio.flush()
2701 # Make sure the buffer doesn't fill up and block further writes
2702 os.read(r, len(data) * 100)
2703 exc = cm.exception
2704 if isinstance(exc, RuntimeError):
2705 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2706 finally:
2707 wio.close()
2708 os.close(r)
2709
2710 def test_reentrant_write_buffered(self):
2711 self.check_reentrant_write(b"xy", mode="wb")
2712
2713 def test_reentrant_write_text(self):
2714 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2715
Antoine Pitrou6439c002011-02-25 21:35:47 +00002716 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2717 """Check that a buffered read, when it gets interrupted (either
2718 returning a partial result or EINTR), properly invokes the signal
2719 handler and retries if the latter returned successfully."""
2720 r, w = os.pipe()
2721 fdopen_kwargs["closefd"] = False
2722 def alarm_handler(sig, frame):
2723 os.write(w, b"bar")
2724 signal.signal(signal.SIGALRM, alarm_handler)
2725 try:
2726 rio = self.io.open(r, **fdopen_kwargs)
2727 os.write(w, b"foo")
2728 signal.alarm(1)
2729 # Expected behaviour:
2730 # - first raw read() returns partial b"foo"
2731 # - second raw read() returns EINTR
2732 # - third raw read() returns b"bar"
2733 self.assertEqual(decode(rio.read(6)), "foobar")
2734 finally:
2735 rio.close()
2736 os.close(w)
2737 os.close(r)
2738
2739 def test_interrupterd_read_retry_buffered(self):
2740 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2741 mode="rb")
2742
2743 def test_interrupterd_read_retry_text(self):
2744 self.check_interrupted_read_retry(lambda x: x,
2745 mode="r")
2746
2747 @unittest.skipUnless(threading, 'Threading required for this test.')
2748 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2749 """Check that a buffered write, when it gets interrupted (either
2750 returning a partial result or EINTR), properly invokes the signal
2751 handler and retries if the latter returned successfully."""
2752 select = support.import_module("select")
2753 # A quantity that exceeds the buffer size of an anonymous pipe's
2754 # write end.
2755 N = 1024 * 1024
2756 r, w = os.pipe()
2757 fdopen_kwargs["closefd"] = False
2758 # We need a separate thread to read from the pipe and allow the
2759 # write() to finish. This thread is started after the SIGALRM is
2760 # received (forcing a first EINTR in write()).
2761 read_results = []
2762 write_finished = False
2763 def _read():
2764 while not write_finished:
2765 while r in select.select([r], [], [], 1.0)[0]:
2766 s = os.read(r, 1024)
2767 read_results.append(s)
2768 t = threading.Thread(target=_read)
2769 t.daemon = True
2770 def alarm1(sig, frame):
2771 signal.signal(signal.SIGALRM, alarm2)
2772 signal.alarm(1)
2773 def alarm2(sig, frame):
2774 t.start()
2775 signal.signal(signal.SIGALRM, alarm1)
2776 try:
2777 wio = self.io.open(w, **fdopen_kwargs)
2778 signal.alarm(1)
2779 # Expected behaviour:
2780 # - first raw write() is partial (because of the limited pipe buffer
2781 # and the first alarm)
2782 # - second raw write() returns EINTR (because of the second alarm)
2783 # - subsequent write()s are successful (either partial or complete)
2784 self.assertEqual(N, wio.write(item * N))
2785 wio.flush()
2786 write_finished = True
2787 t.join()
2788 self.assertEqual(N, sum(len(x) for x in read_results))
2789 finally:
2790 write_finished = True
2791 os.close(w)
2792 os.close(r)
2793 # This is deliberate. If we didn't close the file descriptor
2794 # before closing wio, wio would try to flush its internal
2795 # buffer, and could block (in case of failure).
2796 try:
2797 wio.close()
2798 except IOError as e:
2799 if e.errno != errno.EBADF:
2800 raise
2801
2802 def test_interrupterd_write_retry_buffered(self):
2803 self.check_interrupted_write_retry(b"x", mode="wb")
2804
2805 def test_interrupterd_write_retry_text(self):
2806 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
2807
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002808
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002809class CSignalsTest(SignalsTest):
2810 io = io
2811
2812class PySignalsTest(SignalsTest):
2813 io = pyio
2814
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002815 # Handling reentrancy issues would slow down _pyio even more, so the
2816 # tests are disabled.
2817 test_reentrant_write_buffered = None
2818 test_reentrant_write_text = None
2819
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002820
Christian Heimes1a6387e2008-03-26 12:49:49 +00002821def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00002822 tests = (CIOTest, PyIOTest,
2823 CBufferedReaderTest, PyBufferedReaderTest,
2824 CBufferedWriterTest, PyBufferedWriterTest,
2825 CBufferedRWPairTest, PyBufferedRWPairTest,
2826 CBufferedRandomTest, PyBufferedRandomTest,
2827 StatefulIncrementalDecoderTest,
2828 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2829 CTextIOWrapperTest, PyTextIOWrapperTest,
2830 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002831 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00002832 )
2833
2834 # Put the namespaces of the IO module we are testing and some useful mock
2835 # classes in the __dict__ of each test.
2836 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00002837 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00002838 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2839 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
2840 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
2841 globs = globals()
2842 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2843 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2844 # Avoid turning open into a bound method.
2845 py_io_ns["open"] = pyio.OpenWrapper
2846 for test in tests:
2847 if test.__name__.startswith("C"):
2848 for name, obj in c_io_ns.items():
2849 setattr(test, name, obj)
2850 elif test.__name__.startswith("Py"):
2851 for name, obj in py_io_ns.items():
2852 setattr(test, name, obj)
2853
2854 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002855
2856if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00002857 test_main()