blob: c423dd67047f8a63a69d3f9a210d71d679590893 [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Antoine Pitrou19690592009-06-12 20:14:08 +000032import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000033import signal
34import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000035from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000036from collections import deque
37from test import test_support as support
Christian Heimes1a6387e2008-03-26 12:49:49 +000038
39import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000040import io # C implementation of io
41import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000042try:
43 import threading
44except ImportError:
45 threading = None
Antoine Pitrou19690592009-06-12 20:14:08 +000046
47__metaclass__ = type
48bytes = support.py3k_bytes
49
50def _default_chunk_size():
51 """Get the default TextIOWrapper chunk size"""
52 with io.open(__file__, "r", encoding="latin1") as f:
53 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000054
55
Antoine Pitrou6391b342010-09-14 18:48:19 +000056class MockRawIOWithoutRead:
57 """A RawIO implementation without read(), so as to exercise the default
58 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000059
60 def __init__(self, read_stack=()):
61 self._read_stack = list(read_stack)
62 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000063 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000064 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000065
Christian Heimes1a6387e2008-03-26 12:49:49 +000066 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000067 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000068 return len(b)
69
70 def writable(self):
71 return True
72
73 def fileno(self):
74 return 42
75
76 def readable(self):
77 return True
78
79 def seekable(self):
80 return True
81
82 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000083 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000084
85 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000086 return 0 # same comment as above
87
88 def readinto(self, buf):
89 self._reads += 1
90 max_len = len(buf)
91 try:
92 data = self._read_stack[0]
93 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000094 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +000095 return 0
96 if data is None:
97 del self._read_stack[0]
98 return None
99 n = len(data)
100 if len(data) <= max_len:
101 del self._read_stack[0]
102 buf[:n] = data
103 return n
104 else:
105 buf[:] = data[:max_len]
106 self._read_stack[0] = data[max_len:]
107 return max_len
108
109 def truncate(self, pos=None):
110 return pos
111
Antoine Pitrou6391b342010-09-14 18:48:19 +0000112class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
113 pass
114
115class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
116 pass
117
118
119class MockRawIO(MockRawIOWithoutRead):
120
121 def read(self, n=None):
122 self._reads += 1
123 try:
124 return self._read_stack.pop(0)
125 except:
126 self._extraneous_reads += 1
127 return b""
128
Antoine Pitrou19690592009-06-12 20:14:08 +0000129class CMockRawIO(MockRawIO, io.RawIOBase):
130 pass
131
132class PyMockRawIO(MockRawIO, pyio.RawIOBase):
133 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000134
135
Antoine Pitrou19690592009-06-12 20:14:08 +0000136class MisbehavedRawIO(MockRawIO):
137 def write(self, b):
138 return MockRawIO.write(self, b) * 2
139
140 def read(self, n=None):
141 return MockRawIO.read(self, n) * 2
142
143 def seek(self, pos, whence):
144 return -123
145
146 def tell(self):
147 return -456
148
149 def readinto(self, buf):
150 MockRawIO.readinto(self, buf)
151 return len(buf) * 5
152
153class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
154 pass
155
156class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
157 pass
158
159
160class CloseFailureIO(MockRawIO):
161 closed = 0
162
163 def close(self):
164 if not self.closed:
165 self.closed = 1
166 raise IOError
167
168class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
169 pass
170
171class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
172 pass
173
174
175class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000176
177 def __init__(self, data):
178 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000179 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000180
181 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000182 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000183 self.read_history.append(None if res is None else len(res))
184 return res
185
Antoine Pitrou19690592009-06-12 20:14:08 +0000186 def readinto(self, b):
187 res = super(MockFileIO, self).readinto(b)
188 self.read_history.append(res)
189 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000190
Antoine Pitrou19690592009-06-12 20:14:08 +0000191class CMockFileIO(MockFileIO, io.BytesIO):
192 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000193
Antoine Pitrou19690592009-06-12 20:14:08 +0000194class PyMockFileIO(MockFileIO, pyio.BytesIO):
195 pass
196
197
198class MockNonBlockWriterIO:
199
200 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000201 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000202 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000203
Antoine Pitrou19690592009-06-12 20:14:08 +0000204 def pop_written(self):
205 s = b"".join(self._write_stack)
206 self._write_stack[:] = []
207 return s
208
209 def block_on(self, char):
210 """Block when a given char is encountered."""
211 self._blocker_char = char
212
213 def readable(self):
214 return True
215
216 def seekable(self):
217 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000218
219 def writable(self):
220 return True
221
Antoine Pitrou19690592009-06-12 20:14:08 +0000222 def write(self, b):
223 b = bytes(b)
224 n = -1
225 if self._blocker_char:
226 try:
227 n = b.index(self._blocker_char)
228 except ValueError:
229 pass
230 else:
231 self._blocker_char = None
232 self._write_stack.append(b[:n])
233 raise self.BlockingIOError(0, "test blocking", n)
234 self._write_stack.append(b)
235 return len(b)
236
237class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
238 BlockingIOError = io.BlockingIOError
239
240class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
241 BlockingIOError = pyio.BlockingIOError
242
Christian Heimes1a6387e2008-03-26 12:49:49 +0000243
244class IOTest(unittest.TestCase):
245
Antoine Pitrou19690592009-06-12 20:14:08 +0000246 def setUp(self):
247 support.unlink(support.TESTFN)
248
Christian Heimes1a6387e2008-03-26 12:49:49 +0000249 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000250 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000251
252 def write_ops(self, f):
253 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000254 f.truncate(0)
255 self.assertEqual(f.tell(), 5)
256 f.seek(0)
257
258 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000259 self.assertEqual(f.seek(0), 0)
260 self.assertEqual(f.write(b"Hello."), 6)
261 self.assertEqual(f.tell(), 6)
262 self.assertEqual(f.seek(-1, 1), 5)
263 self.assertEqual(f.tell(), 5)
264 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
265 self.assertEqual(f.seek(0), 0)
266 self.assertEqual(f.write(b"h"), 1)
267 self.assertEqual(f.seek(-1, 2), 13)
268 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000269
Christian Heimes1a6387e2008-03-26 12:49:49 +0000270 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000271 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000272 self.assertRaises(TypeError, f.seek, 0.0)
273
274 def read_ops(self, f, buffered=False):
275 data = f.read(5)
276 self.assertEqual(data, b"hello")
277 data = bytearray(data)
278 self.assertEqual(f.readinto(data), 5)
279 self.assertEqual(data, b" worl")
280 self.assertEqual(f.readinto(data), 2)
281 self.assertEqual(len(data), 5)
282 self.assertEqual(data[:2], b"d\n")
283 self.assertEqual(f.seek(0), 0)
284 self.assertEqual(f.read(20), b"hello world\n")
285 self.assertEqual(f.read(1), b"")
286 self.assertEqual(f.readinto(bytearray(b"x")), 0)
287 self.assertEqual(f.seek(-6, 2), 6)
288 self.assertEqual(f.read(5), b"world")
289 self.assertEqual(f.read(0), b"")
290 self.assertEqual(f.readinto(bytearray()), 0)
291 self.assertEqual(f.seek(-6, 1), 5)
292 self.assertEqual(f.read(5), b" worl")
293 self.assertEqual(f.tell(), 10)
294 self.assertRaises(TypeError, f.seek, 0.0)
295 if buffered:
296 f.seek(0)
297 self.assertEqual(f.read(), b"hello world\n")
298 f.seek(6)
299 self.assertEqual(f.read(), b"world\n")
300 self.assertEqual(f.read(), b"")
301
302 LARGE = 2**31
303
304 def large_file_ops(self, f):
305 assert f.readable()
306 assert f.writable()
307 self.assertEqual(f.seek(self.LARGE), self.LARGE)
308 self.assertEqual(f.tell(), self.LARGE)
309 self.assertEqual(f.write(b"xxx"), 3)
310 self.assertEqual(f.tell(), self.LARGE + 3)
311 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
312 self.assertEqual(f.truncate(), self.LARGE + 2)
313 self.assertEqual(f.tell(), self.LARGE + 2)
314 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
315 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000316 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000317 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
318 self.assertEqual(f.seek(-1, 2), self.LARGE)
319 self.assertEqual(f.read(2), b"x")
320
Antoine Pitrou19690592009-06-12 20:14:08 +0000321 def test_invalid_operations(self):
322 # Try writing on a file opened in read mode and vice-versa.
323 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000324 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000325 self.assertRaises(IOError, fp.read)
326 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000327 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000328 self.assertRaises(IOError, fp.write, b"blah")
329 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000330 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000331 self.assertRaises(IOError, fp.write, "blah")
332 self.assertRaises(IOError, fp.writelines, ["blah\n"])
333
Christian Heimes1a6387e2008-03-26 12:49:49 +0000334 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000335 with self.open(support.TESTFN, "wb", buffering=0) as f:
336 self.assertEqual(f.readable(), False)
337 self.assertEqual(f.writable(), True)
338 self.assertEqual(f.seekable(), True)
339 self.write_ops(f)
340 with self.open(support.TESTFN, "rb", buffering=0) as f:
341 self.assertEqual(f.readable(), True)
342 self.assertEqual(f.writable(), False)
343 self.assertEqual(f.seekable(), True)
344 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000345
346 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000347 with self.open(support.TESTFN, "wb") as f:
348 self.assertEqual(f.readable(), False)
349 self.assertEqual(f.writable(), True)
350 self.assertEqual(f.seekable(), True)
351 self.write_ops(f)
352 with self.open(support.TESTFN, "rb") as f:
353 self.assertEqual(f.readable(), True)
354 self.assertEqual(f.writable(), False)
355 self.assertEqual(f.seekable(), True)
356 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000357
358 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000359 with self.open(support.TESTFN, "wb") as f:
360 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
361 with self.open(support.TESTFN, "rb") as f:
362 self.assertEqual(f.readline(), b"abc\n")
363 self.assertEqual(f.readline(10), b"def\n")
364 self.assertEqual(f.readline(2), b"xy")
365 self.assertEqual(f.readline(4), b"zzy\n")
366 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000367 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000368 self.assertRaises(TypeError, f.readline, 5.3)
369 with self.open(support.TESTFN, "r") as f:
370 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000371
372 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000373 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000374 self.write_ops(f)
375 data = f.getvalue()
376 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000377 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000378 self.read_ops(f, True)
379
380 def test_large_file_ops(self):
381 # On Windows and Mac OSX this test comsumes large resources; It takes
382 # a long time to build the >2GB file and takes >2GB of disk space
383 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000384 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
385 if not support.is_resource_enabled("largefile"):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000386 print("\nTesting large file ops skipped on %s." % sys.platform,
387 file=sys.stderr)
388 print("It requires %d bytes and a long time." % self.LARGE,
389 file=sys.stderr)
390 print("Use 'regrtest.py -u largefile test_io' to run it.",
391 file=sys.stderr)
392 return
Antoine Pitrou19690592009-06-12 20:14:08 +0000393 with self.open(support.TESTFN, "w+b", 0) as f:
394 self.large_file_ops(f)
395 with self.open(support.TESTFN, "w+b") as f:
396 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000397
398 def test_with_open(self):
399 for bufsize in (0, 1, 100):
400 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000401 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000402 f.write(b"xxx")
403 self.assertEqual(f.closed, True)
404 f = None
405 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000406 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000407 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000408 except ZeroDivisionError:
409 self.assertEqual(f.closed, True)
410 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000411 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000412
Antoine Pitroue741cc62009-01-21 00:45:36 +0000413 # issue 5008
414 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000415 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000416 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000417 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000418 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000419 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000420 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000421 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000422 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000423
Christian Heimes1a6387e2008-03-26 12:49:49 +0000424 def test_destructor(self):
425 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000426 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000427 def __del__(self):
428 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000429 try:
430 f = super(MyFileIO, self).__del__
431 except AttributeError:
432 pass
433 else:
434 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000435 def close(self):
436 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000437 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000438 def flush(self):
439 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000440 super(MyFileIO, self).flush()
441 f = MyFileIO(support.TESTFN, "wb")
442 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000443 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000444 support.gc_collect()
445 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000446 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000447 self.assertEqual(f.read(), b"xxx")
448
449 def _check_base_destructor(self, base):
450 record = []
451 class MyIO(base):
452 def __init__(self):
453 # This exercises the availability of attributes on object
454 # destruction.
455 # (in the C version, close() is called by the tp_dealloc
456 # function, not by __del__)
457 self.on_del = 1
458 self.on_close = 2
459 self.on_flush = 3
460 def __del__(self):
461 record.append(self.on_del)
462 try:
463 f = super(MyIO, self).__del__
464 except AttributeError:
465 pass
466 else:
467 f()
468 def close(self):
469 record.append(self.on_close)
470 super(MyIO, self).close()
471 def flush(self):
472 record.append(self.on_flush)
473 super(MyIO, self).flush()
474 f = MyIO()
475 del f
476 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000477 self.assertEqual(record, [1, 2, 3])
478
Antoine Pitrou19690592009-06-12 20:14:08 +0000479 def test_IOBase_destructor(self):
480 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000481
Antoine Pitrou19690592009-06-12 20:14:08 +0000482 def test_RawIOBase_destructor(self):
483 self._check_base_destructor(self.RawIOBase)
484
485 def test_BufferedIOBase_destructor(self):
486 self._check_base_destructor(self.BufferedIOBase)
487
488 def test_TextIOBase_destructor(self):
489 self._check_base_destructor(self.TextIOBase)
490
491 def test_close_flushes(self):
492 with self.open(support.TESTFN, "wb") as f:
493 f.write(b"xxx")
494 with self.open(support.TESTFN, "rb") as f:
495 self.assertEqual(f.read(), b"xxx")
496
497 def test_array_writes(self):
498 a = array.array(b'i', range(10))
499 n = len(a.tostring())
500 with self.open(support.TESTFN, "wb", 0) as f:
501 self.assertEqual(f.write(a), n)
502 with self.open(support.TESTFN, "wb") as f:
503 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000504
505 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000506 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000507 closefd=False)
508
Antoine Pitrou19690592009-06-12 20:14:08 +0000509 def test_read_closed(self):
510 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000511 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000512 with self.open(support.TESTFN, "r") as f:
513 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000514 self.assertEqual(file.read(), "egg\n")
515 file.seek(0)
516 file.close()
517 self.assertRaises(ValueError, file.read)
518
519 def test_no_closefd_with_filename(self):
520 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000521 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000522
523 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000524 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000525 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000526 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000527 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000528 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000529 self.assertEqual(file.buffer.raw.closefd, False)
530
Antoine Pitrou19690592009-06-12 20:14:08 +0000531 def test_garbage_collection(self):
532 # FileIO objects are collected, and collecting them flushes
533 # all data to disk.
534 f = self.FileIO(support.TESTFN, "wb")
535 f.write(b"abcxxx")
536 f.f = f
537 wr = weakref.ref(f)
538 del f
539 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000540 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000541 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000542 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000543
Antoine Pitrou19690592009-06-12 20:14:08 +0000544 def test_unbounded_file(self):
545 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
546 zero = "/dev/zero"
547 if not os.path.exists(zero):
548 self.skipTest("{0} does not exist".format(zero))
549 if sys.maxsize > 0x7FFFFFFF:
550 self.skipTest("test can only run in a 32-bit address space")
551 if support.real_max_memuse < support._2G:
552 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000553 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000554 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000555 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000556 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000557 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000558 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000559
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000560 def test_flush_error_on_close(self):
561 f = self.open(support.TESTFN, "wb", buffering=0)
562 def bad_flush():
563 raise IOError()
564 f.flush = bad_flush
565 self.assertRaises(IOError, f.close) # exception not swallowed
566
567 def test_multi_close(self):
568 f = self.open(support.TESTFN, "wb", buffering=0)
569 f.close()
570 f.close()
571 f.close()
572 self.assertRaises(ValueError, f.flush)
573
Antoine Pitrou6391b342010-09-14 18:48:19 +0000574 def test_RawIOBase_read(self):
575 # Exercise the default RawIOBase.read() implementation (which calls
576 # readinto() internally).
577 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
578 self.assertEqual(rawio.read(2), b"ab")
579 self.assertEqual(rawio.read(2), b"c")
580 self.assertEqual(rawio.read(2), b"d")
581 self.assertEqual(rawio.read(2), None)
582 self.assertEqual(rawio.read(2), b"ef")
583 self.assertEqual(rawio.read(2), b"g")
584 self.assertEqual(rawio.read(2), None)
585 self.assertEqual(rawio.read(2), b"")
586
Antoine Pitrou19690592009-06-12 20:14:08 +0000587class CIOTest(IOTest):
Antoine Pitrou16166452011-07-12 22:04:20 +0200588
589 def test_IOBase_finalize(self):
590 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
591 # class which inherits IOBase and an object of this class are caught
592 # in a reference cycle and close() is already in the method cache.
593 class MyIO(self.IOBase):
594 def close(self):
595 pass
596
597 # create an instance to populate the method cache
598 MyIO()
599 obj = MyIO()
600 obj.obj = obj
601 wr = weakref.ref(obj)
602 del MyIO
603 del obj
604 support.gc_collect()
605 self.assertTrue(wr() is None, wr)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000606
Antoine Pitrou19690592009-06-12 20:14:08 +0000607class PyIOTest(IOTest):
608 test_array_writes = unittest.skip(
609 "len(array.array) returns number of elements rather than bytelength"
610 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000611
612
Antoine Pitrou19690592009-06-12 20:14:08 +0000613class CommonBufferedTests:
614 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
615
616 def test_detach(self):
617 raw = self.MockRawIO()
618 buf = self.tp(raw)
619 self.assertIs(buf.detach(), raw)
620 self.assertRaises(ValueError, buf.detach)
621
622 def test_fileno(self):
623 rawio = self.MockRawIO()
624 bufio = self.tp(rawio)
625
Ezio Melotti2623a372010-11-21 13:34:58 +0000626 self.assertEqual(42, bufio.fileno())
Antoine Pitrou19690592009-06-12 20:14:08 +0000627
628 def test_no_fileno(self):
629 # XXX will we always have fileno() function? If so, kill
630 # this test. Else, write it.
631 pass
632
633 def test_invalid_args(self):
634 rawio = self.MockRawIO()
635 bufio = self.tp(rawio)
636 # Invalid whence
637 self.assertRaises(ValueError, bufio.seek, 0, -1)
638 self.assertRaises(ValueError, bufio.seek, 0, 3)
639
640 def test_override_destructor(self):
641 tp = self.tp
642 record = []
643 class MyBufferedIO(tp):
644 def __del__(self):
645 record.append(1)
646 try:
647 f = super(MyBufferedIO, self).__del__
648 except AttributeError:
649 pass
650 else:
651 f()
652 def close(self):
653 record.append(2)
654 super(MyBufferedIO, self).close()
655 def flush(self):
656 record.append(3)
657 super(MyBufferedIO, self).flush()
658 rawio = self.MockRawIO()
659 bufio = MyBufferedIO(rawio)
660 writable = bufio.writable()
661 del bufio
662 support.gc_collect()
663 if writable:
664 self.assertEqual(record, [1, 2, 3])
665 else:
666 self.assertEqual(record, [1, 2])
667
668 def test_context_manager(self):
669 # Test usability as a context manager
670 rawio = self.MockRawIO()
671 bufio = self.tp(rawio)
672 def _with():
673 with bufio:
674 pass
675 _with()
676 # bufio should now be closed, and using it a second time should raise
677 # a ValueError.
678 self.assertRaises(ValueError, _with)
679
680 def test_error_through_destructor(self):
681 # Test that the exception state is not modified by a destructor,
682 # even if close() fails.
683 rawio = self.CloseFailureIO()
684 def f():
685 self.tp(rawio).xyzzy
686 with support.captured_output("stderr") as s:
687 self.assertRaises(AttributeError, f)
688 s = s.getvalue().strip()
689 if s:
690 # The destructor *may* have printed an unraisable error, check it
691 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000692 self.assertTrue(s.startswith("Exception IOError: "), s)
693 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000694
695 def test_repr(self):
696 raw = self.MockRawIO()
697 b = self.tp(raw)
698 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
699 self.assertEqual(repr(b), "<%s>" % clsname)
700 raw.name = "dummy"
701 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
702 raw.name = b"dummy"
703 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000704
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000705 def test_flush_error_on_close(self):
706 raw = self.MockRawIO()
707 def bad_flush():
708 raise IOError()
709 raw.flush = bad_flush
710 b = self.tp(raw)
711 self.assertRaises(IOError, b.close) # exception not swallowed
712
713 def test_multi_close(self):
714 raw = self.MockRawIO()
715 b = self.tp(raw)
716 b.close()
717 b.close()
718 b.close()
719 self.assertRaises(ValueError, b.flush)
720
Antoine Pitroufc9ead62010-12-21 21:26:55 +0000721 def test_readonly_attributes(self):
722 raw = self.MockRawIO()
723 buf = self.tp(raw)
724 x = self.MockRawIO()
725 with self.assertRaises((AttributeError, TypeError)):
726 buf.raw = x
727
Christian Heimes1a6387e2008-03-26 12:49:49 +0000728
Antoine Pitrou19690592009-06-12 20:14:08 +0000729class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
730 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000731
Antoine Pitrou19690592009-06-12 20:14:08 +0000732 def test_constructor(self):
733 rawio = self.MockRawIO([b"abc"])
734 bufio = self.tp(rawio)
735 bufio.__init__(rawio)
736 bufio.__init__(rawio, buffer_size=1024)
737 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000738 self.assertEqual(b"abc", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000739 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
740 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
741 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
742 rawio = self.MockRawIO([b"abc"])
743 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000744 self.assertEqual(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000745
Antoine Pitrou19690592009-06-12 20:14:08 +0000746 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000747 for arg in (None, 7):
748 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
749 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000750 self.assertEqual(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000751 # Invalid args
752 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000753
Antoine Pitrou19690592009-06-12 20:14:08 +0000754 def test_read1(self):
755 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
756 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000757 self.assertEqual(b"a", bufio.read(1))
758 self.assertEqual(b"b", bufio.read1(1))
759 self.assertEqual(rawio._reads, 1)
760 self.assertEqual(b"c", bufio.read1(100))
761 self.assertEqual(rawio._reads, 1)
762 self.assertEqual(b"d", bufio.read1(100))
763 self.assertEqual(rawio._reads, 2)
764 self.assertEqual(b"efg", bufio.read1(100))
765 self.assertEqual(rawio._reads, 3)
766 self.assertEqual(b"", bufio.read1(100))
767 self.assertEqual(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000768 # Invalid args
769 self.assertRaises(ValueError, bufio.read1, -1)
770
771 def test_readinto(self):
772 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
773 bufio = self.tp(rawio)
774 b = bytearray(2)
Ezio Melotti2623a372010-11-21 13:34:58 +0000775 self.assertEqual(bufio.readinto(b), 2)
776 self.assertEqual(b, b"ab")
777 self.assertEqual(bufio.readinto(b), 2)
778 self.assertEqual(b, b"cd")
779 self.assertEqual(bufio.readinto(b), 2)
780 self.assertEqual(b, b"ef")
781 self.assertEqual(bufio.readinto(b), 1)
782 self.assertEqual(b, b"gf")
783 self.assertEqual(bufio.readinto(b), 0)
784 self.assertEqual(b, b"gf")
Antoine Pitrou19690592009-06-12 20:14:08 +0000785
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000786 def test_readlines(self):
787 def bufio():
788 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
789 return self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000790 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
791 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
792 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000793
Antoine Pitrou19690592009-06-12 20:14:08 +0000794 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000795 data = b"abcdefghi"
796 dlen = len(data)
797
798 tests = [
799 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
800 [ 100, [ 3, 3, 3], [ dlen ] ],
801 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
802 ]
803
804 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000805 rawio = self.MockFileIO(data)
806 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000807 pos = 0
808 for nbytes in buf_read_sizes:
Ezio Melotti2623a372010-11-21 13:34:58 +0000809 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Christian Heimes1a6387e2008-03-26 12:49:49 +0000810 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000811 # this is mildly implementation-dependent
Ezio Melotti2623a372010-11-21 13:34:58 +0000812 self.assertEqual(rawio.read_history, raw_read_sizes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000813
Antoine Pitrou19690592009-06-12 20:14:08 +0000814 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000815 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000816 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
817 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000818 self.assertEqual(b"abcd", bufio.read(6))
819 self.assertEqual(b"e", bufio.read(1))
820 self.assertEqual(b"fg", bufio.read())
821 self.assertEqual(b"", bufio.peek(1))
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200822 self.assertIsNone(bufio.read())
Ezio Melotti2623a372010-11-21 13:34:58 +0000823 self.assertEqual(b"", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000824
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200825 rawio = self.MockRawIO((b"a", None, None))
826 self.assertEqual(b"a", rawio.readall())
827 self.assertIsNone(rawio.readall())
828
Antoine Pitrou19690592009-06-12 20:14:08 +0000829 def test_read_past_eof(self):
830 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
831 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000832
Ezio Melotti2623a372010-11-21 13:34:58 +0000833 self.assertEqual(b"abcdefg", bufio.read(9000))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000834
Antoine Pitrou19690592009-06-12 20:14:08 +0000835 def test_read_all(self):
836 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
837 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000838
Ezio Melotti2623a372010-11-21 13:34:58 +0000839 self.assertEqual(b"abcdefg", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000840
Victor Stinner6a102812010-04-27 23:55:59 +0000841 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +0000842 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +0000843 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000844 try:
845 # Write out many bytes with exactly the same number of 0's,
846 # 1's... 255's. This will help us check that concurrent reading
847 # doesn't duplicate or forget contents.
848 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000849 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000850 random.shuffle(l)
851 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000852 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000853 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000854 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000855 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000856 errors = []
857 results = []
858 def f():
859 try:
860 # Intra-buffer read then buffer-flushing read
861 for n in cycle([1, 19]):
862 s = bufio.read(n)
863 if not s:
864 break
865 # list.append() is atomic
866 results.append(s)
867 except Exception as e:
868 errors.append(e)
869 raise
870 threads = [threading.Thread(target=f) for x in range(20)]
871 for t in threads:
872 t.start()
873 time.sleep(0.02) # yield
874 for t in threads:
875 t.join()
876 self.assertFalse(errors,
877 "the following exceptions were caught: %r" % errors)
878 s = b''.join(results)
879 for i in range(256):
880 c = bytes(bytearray([i]))
881 self.assertEqual(s.count(c), N)
882 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000883 support.unlink(support.TESTFN)
884
885 def test_misbehaved_io(self):
886 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
887 bufio = self.tp(rawio)
888 self.assertRaises(IOError, bufio.seek, 0)
889 self.assertRaises(IOError, bufio.tell)
890
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000891 def test_no_extraneous_read(self):
892 # Issue #9550; when the raw IO object has satisfied the read request,
893 # we should not issue any additional reads, otherwise it may block
894 # (e.g. socket).
895 bufsize = 16
896 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
897 rawio = self.MockRawIO([b"x" * n])
898 bufio = self.tp(rawio, bufsize)
899 self.assertEqual(bufio.read(n), b"x" * n)
900 # Simple case: one raw read is enough to satisfy the request.
901 self.assertEqual(rawio._extraneous_reads, 0,
902 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
903 # A more complex case where two raw reads are needed to satisfy
904 # the request.
905 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
906 bufio = self.tp(rawio, bufsize)
907 self.assertEqual(bufio.read(n), b"x" * n)
908 self.assertEqual(rawio._extraneous_reads, 0,
909 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
910
911
Antoine Pitrou19690592009-06-12 20:14:08 +0000912class CBufferedReaderTest(BufferedReaderTest):
913 tp = io.BufferedReader
914
915 def test_constructor(self):
916 BufferedReaderTest.test_constructor(self)
917 # The allocation can succeed on 32-bit builds, e.g. with more
918 # than 2GB RAM and a 64-bit kernel.
919 if sys.maxsize > 0x7FFFFFFF:
920 rawio = self.MockRawIO()
921 bufio = self.tp(rawio)
922 self.assertRaises((OverflowError, MemoryError, ValueError),
923 bufio.__init__, rawio, sys.maxsize)
924
925 def test_initialization(self):
926 rawio = self.MockRawIO([b"abc"])
927 bufio = self.tp(rawio)
928 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
929 self.assertRaises(ValueError, bufio.read)
930 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
931 self.assertRaises(ValueError, bufio.read)
932 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
933 self.assertRaises(ValueError, bufio.read)
934
935 def test_misbehaved_io_read(self):
936 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
937 bufio = self.tp(rawio)
938 # _pyio.BufferedReader seems to implement reading different, so that
939 # checking this is not so easy.
940 self.assertRaises(IOError, bufio.read, 10)
941
942 def test_garbage_collection(self):
943 # C BufferedReader objects are collected.
944 # The Python version has __del__, so it ends into gc.garbage instead
945 rawio = self.FileIO(support.TESTFN, "w+b")
946 f = self.tp(rawio)
947 f.f = f
948 wr = weakref.ref(f)
949 del f
950 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000951 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +0000952
953class PyBufferedReaderTest(BufferedReaderTest):
954 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000955
956
Antoine Pitrou19690592009-06-12 20:14:08 +0000957class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
958 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000959
Antoine Pitrou19690592009-06-12 20:14:08 +0000960 def test_constructor(self):
961 rawio = self.MockRawIO()
962 bufio = self.tp(rawio)
963 bufio.__init__(rawio)
964 bufio.__init__(rawio, buffer_size=1024)
965 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000966 self.assertEqual(3, bufio.write(b"abc"))
Antoine Pitrou19690592009-06-12 20:14:08 +0000967 bufio.flush()
968 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
969 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
970 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
971 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000972 self.assertEqual(3, bufio.write(b"ghi"))
Antoine Pitrou19690592009-06-12 20:14:08 +0000973 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +0000974 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000975
Antoine Pitrou19690592009-06-12 20:14:08 +0000976 def test_detach_flush(self):
977 raw = self.MockRawIO()
978 buf = self.tp(raw)
979 buf.write(b"howdy!")
980 self.assertFalse(raw._write_stack)
981 buf.detach()
982 self.assertEqual(raw._write_stack, [b"howdy!"])
983
984 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000985 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +0000986 writer = self.MockRawIO()
987 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000988 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000989 self.assertFalse(writer._write_stack)
990
Antoine Pitrou19690592009-06-12 20:14:08 +0000991 def test_write_overflow(self):
992 writer = self.MockRawIO()
993 bufio = self.tp(writer, 8)
994 contents = b"abcdefghijklmnop"
995 for n in range(0, len(contents), 3):
996 bufio.write(contents[n:n+3])
997 flushed = b"".join(writer._write_stack)
998 # At least (total - 8) bytes were implicitly flushed, perhaps more
999 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001000 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001001
Antoine Pitrou19690592009-06-12 20:14:08 +00001002 def check_writes(self, intermediate_func):
1003 # Lots of writes, test the flushed output is as expected.
1004 contents = bytes(range(256)) * 1000
1005 n = 0
1006 writer = self.MockRawIO()
1007 bufio = self.tp(writer, 13)
1008 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1009 def gen_sizes():
1010 for size in count(1):
1011 for i in range(15):
1012 yield size
1013 sizes = gen_sizes()
1014 while n < len(contents):
1015 size = min(next(sizes), len(contents) - n)
Ezio Melotti2623a372010-11-21 13:34:58 +00001016 self.assertEqual(bufio.write(contents[n:n+size]), size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001017 intermediate_func(bufio)
1018 n += size
1019 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001020 self.assertEqual(contents,
Antoine Pitrou19690592009-06-12 20:14:08 +00001021 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001022
Antoine Pitrou19690592009-06-12 20:14:08 +00001023 def test_writes(self):
1024 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001025
Antoine Pitrou19690592009-06-12 20:14:08 +00001026 def test_writes_and_flushes(self):
1027 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001028
Antoine Pitrou19690592009-06-12 20:14:08 +00001029 def test_writes_and_seeks(self):
1030 def _seekabs(bufio):
1031 pos = bufio.tell()
1032 bufio.seek(pos + 1, 0)
1033 bufio.seek(pos - 1, 0)
1034 bufio.seek(pos, 0)
1035 self.check_writes(_seekabs)
1036 def _seekrel(bufio):
1037 pos = bufio.seek(0, 1)
1038 bufio.seek(+1, 1)
1039 bufio.seek(-1, 1)
1040 bufio.seek(pos, 0)
1041 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001042
Antoine Pitrou19690592009-06-12 20:14:08 +00001043 def test_writes_and_truncates(self):
1044 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001045
Antoine Pitrou19690592009-06-12 20:14:08 +00001046 def test_write_non_blocking(self):
1047 raw = self.MockNonBlockWriterIO()
1048 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001049
Ezio Melotti2623a372010-11-21 13:34:58 +00001050 self.assertEqual(bufio.write(b"abcd"), 4)
1051 self.assertEqual(bufio.write(b"efghi"), 5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001052 # 1 byte will be written, the rest will be buffered
1053 raw.block_on(b"k")
Ezio Melotti2623a372010-11-21 13:34:58 +00001054 self.assertEqual(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001055
Antoine Pitrou19690592009-06-12 20:14:08 +00001056 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1057 raw.block_on(b"0")
1058 try:
1059 bufio.write(b"opqrwxyz0123456789")
1060 except self.BlockingIOError as e:
1061 written = e.characters_written
1062 else:
1063 self.fail("BlockingIOError should have been raised")
Ezio Melotti2623a372010-11-21 13:34:58 +00001064 self.assertEqual(written, 16)
1065 self.assertEqual(raw.pop_written(),
Antoine Pitrou19690592009-06-12 20:14:08 +00001066 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001067
Ezio Melotti2623a372010-11-21 13:34:58 +00001068 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Antoine Pitrou19690592009-06-12 20:14:08 +00001069 s = raw.pop_written()
1070 # Previously buffered bytes were flushed
1071 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001072
Antoine Pitrou19690592009-06-12 20:14:08 +00001073 def test_write_and_rewind(self):
1074 raw = io.BytesIO()
1075 bufio = self.tp(raw, 4)
1076 self.assertEqual(bufio.write(b"abcdef"), 6)
1077 self.assertEqual(bufio.tell(), 6)
1078 bufio.seek(0, 0)
1079 self.assertEqual(bufio.write(b"XY"), 2)
1080 bufio.seek(6, 0)
1081 self.assertEqual(raw.getvalue(), b"XYcdef")
1082 self.assertEqual(bufio.write(b"123456"), 6)
1083 bufio.flush()
1084 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001085
Antoine Pitrou19690592009-06-12 20:14:08 +00001086 def test_flush(self):
1087 writer = self.MockRawIO()
1088 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001089 bufio.write(b"abc")
1090 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001091 self.assertEqual(b"abc", writer._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001092
Antoine Pitrou19690592009-06-12 20:14:08 +00001093 def test_destructor(self):
1094 writer = self.MockRawIO()
1095 bufio = self.tp(writer, 8)
1096 bufio.write(b"abc")
1097 del bufio
1098 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001099 self.assertEqual(b"abc", writer._write_stack[0])
Antoine Pitrou19690592009-06-12 20:14:08 +00001100
1101 def test_truncate(self):
1102 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001103 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001104 bufio = self.tp(raw, 8)
1105 bufio.write(b"abcdef")
1106 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001107 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001108 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001109 self.assertEqual(f.read(), b"abc")
1110
Victor Stinner6a102812010-04-27 23:55:59 +00001111 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001112 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001113 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001114 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001115 # Write out many bytes from many threads and test they were
1116 # all flushed.
1117 N = 1000
1118 contents = bytes(range(256)) * N
1119 sizes = cycle([1, 19])
1120 n = 0
1121 queue = deque()
1122 while n < len(contents):
1123 size = next(sizes)
1124 queue.append(contents[n:n+size])
1125 n += size
1126 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001127 # We use a real file object because it allows us to
1128 # exercise situations where the GIL is released before
1129 # writing the buffer to the raw streams. This is in addition
1130 # to concurrency issues due to switching threads in the middle
1131 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001132 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001133 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001134 errors = []
1135 def f():
1136 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001137 while True:
1138 try:
1139 s = queue.popleft()
1140 except IndexError:
1141 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001142 bufio.write(s)
1143 except Exception as e:
1144 errors.append(e)
1145 raise
1146 threads = [threading.Thread(target=f) for x in range(20)]
1147 for t in threads:
1148 t.start()
1149 time.sleep(0.02) # yield
1150 for t in threads:
1151 t.join()
1152 self.assertFalse(errors,
1153 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001154 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001155 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001156 s = f.read()
1157 for i in range(256):
Ezio Melotti2623a372010-11-21 13:34:58 +00001158 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001159 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001160 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001161
Antoine Pitrou19690592009-06-12 20:14:08 +00001162 def test_misbehaved_io(self):
1163 rawio = self.MisbehavedRawIO()
1164 bufio = self.tp(rawio, 5)
1165 self.assertRaises(IOError, bufio.seek, 0)
1166 self.assertRaises(IOError, bufio.tell)
1167 self.assertRaises(IOError, bufio.write, b"abcdef")
1168
1169 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001170 with support.check_warnings(("max_buffer_size is deprecated",
1171 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001172 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001173
1174
1175class CBufferedWriterTest(BufferedWriterTest):
1176 tp = io.BufferedWriter
1177
1178 def test_constructor(self):
1179 BufferedWriterTest.test_constructor(self)
1180 # The allocation can succeed on 32-bit builds, e.g. with more
1181 # than 2GB RAM and a 64-bit kernel.
1182 if sys.maxsize > 0x7FFFFFFF:
1183 rawio = self.MockRawIO()
1184 bufio = self.tp(rawio)
1185 self.assertRaises((OverflowError, MemoryError, ValueError),
1186 bufio.__init__, rawio, sys.maxsize)
1187
1188 def test_initialization(self):
1189 rawio = self.MockRawIO()
1190 bufio = self.tp(rawio)
1191 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1192 self.assertRaises(ValueError, bufio.write, b"def")
1193 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1194 self.assertRaises(ValueError, bufio.write, b"def")
1195 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1196 self.assertRaises(ValueError, bufio.write, b"def")
1197
1198 def test_garbage_collection(self):
1199 # C BufferedWriter objects are collected, and collecting them flushes
1200 # all data to disk.
1201 # The Python version has __del__, so it ends into gc.garbage instead
1202 rawio = self.FileIO(support.TESTFN, "w+b")
1203 f = self.tp(rawio)
1204 f.write(b"123xxx")
1205 f.x = f
1206 wr = weakref.ref(f)
1207 del f
1208 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001209 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001210 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001211 self.assertEqual(f.read(), b"123xxx")
1212
1213
1214class PyBufferedWriterTest(BufferedWriterTest):
1215 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001216
1217class BufferedRWPairTest(unittest.TestCase):
1218
Antoine Pitrou19690592009-06-12 20:14:08 +00001219 def test_constructor(self):
1220 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001221 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001222
Antoine Pitrou19690592009-06-12 20:14:08 +00001223 def test_detach(self):
1224 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1225 self.assertRaises(self.UnsupportedOperation, pair.detach)
1226
1227 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001228 with support.check_warnings(("max_buffer_size is deprecated",
1229 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001230 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001231
1232 def test_constructor_with_not_readable(self):
1233 class NotReadable(MockRawIO):
1234 def readable(self):
1235 return False
1236
1237 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1238
1239 def test_constructor_with_not_writeable(self):
1240 class NotWriteable(MockRawIO):
1241 def writable(self):
1242 return False
1243
1244 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1245
1246 def test_read(self):
1247 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1248
1249 self.assertEqual(pair.read(3), b"abc")
1250 self.assertEqual(pair.read(1), b"d")
1251 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001252 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1253 self.assertEqual(pair.read(None), b"abc")
1254
1255 def test_readlines(self):
1256 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1257 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1258 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1259 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001260
1261 def test_read1(self):
1262 # .read1() is delegated to the underlying reader object, so this test
1263 # can be shallow.
1264 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1265
1266 self.assertEqual(pair.read1(3), b"abc")
1267
1268 def test_readinto(self):
1269 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1270
1271 data = bytearray(5)
1272 self.assertEqual(pair.readinto(data), 5)
1273 self.assertEqual(data, b"abcde")
1274
1275 def test_write(self):
1276 w = self.MockRawIO()
1277 pair = self.tp(self.MockRawIO(), w)
1278
1279 pair.write(b"abc")
1280 pair.flush()
1281 pair.write(b"def")
1282 pair.flush()
1283 self.assertEqual(w._write_stack, [b"abc", b"def"])
1284
1285 def test_peek(self):
1286 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1287
1288 self.assertTrue(pair.peek(3).startswith(b"abc"))
1289 self.assertEqual(pair.read(3), b"abc")
1290
1291 def test_readable(self):
1292 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1293 self.assertTrue(pair.readable())
1294
1295 def test_writeable(self):
1296 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1297 self.assertTrue(pair.writable())
1298
1299 def test_seekable(self):
1300 # BufferedRWPairs are never seekable, even if their readers and writers
1301 # are.
1302 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1303 self.assertFalse(pair.seekable())
1304
1305 # .flush() is delegated to the underlying writer object and has been
1306 # tested in the test_write method.
1307
1308 def test_close_and_closed(self):
1309 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1310 self.assertFalse(pair.closed)
1311 pair.close()
1312 self.assertTrue(pair.closed)
1313
1314 def test_isatty(self):
1315 class SelectableIsAtty(MockRawIO):
1316 def __init__(self, isatty):
1317 MockRawIO.__init__(self)
1318 self._isatty = isatty
1319
1320 def isatty(self):
1321 return self._isatty
1322
1323 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1324 self.assertFalse(pair.isatty())
1325
1326 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1327 self.assertTrue(pair.isatty())
1328
1329 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1330 self.assertTrue(pair.isatty())
1331
1332 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1333 self.assertTrue(pair.isatty())
1334
1335class CBufferedRWPairTest(BufferedRWPairTest):
1336 tp = io.BufferedRWPair
1337
1338class PyBufferedRWPairTest(BufferedRWPairTest):
1339 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001340
1341
Antoine Pitrou19690592009-06-12 20:14:08 +00001342class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1343 read_mode = "rb+"
1344 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001345
Antoine Pitrou19690592009-06-12 20:14:08 +00001346 def test_constructor(self):
1347 BufferedReaderTest.test_constructor(self)
1348 BufferedWriterTest.test_constructor(self)
1349
1350 def test_read_and_write(self):
1351 raw = self.MockRawIO((b"asdf", b"ghjk"))
1352 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001353
1354 self.assertEqual(b"as", rw.read(2))
1355 rw.write(b"ddd")
1356 rw.write(b"eee")
1357 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001358 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001359 self.assertEqual(b"dddeee", raw._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001360
Antoine Pitrou19690592009-06-12 20:14:08 +00001361 def test_seek_and_tell(self):
1362 raw = self.BytesIO(b"asdfghjkl")
1363 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001364
Ezio Melotti2623a372010-11-21 13:34:58 +00001365 self.assertEqual(b"as", rw.read(2))
1366 self.assertEqual(2, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001367 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001368 self.assertEqual(b"asdf", rw.read(4))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001369
1370 rw.write(b"asdf")
1371 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001372 self.assertEqual(b"asdfasdfl", rw.read())
1373 self.assertEqual(9, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001374 rw.seek(-4, 2)
Ezio Melotti2623a372010-11-21 13:34:58 +00001375 self.assertEqual(5, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001376 rw.seek(2, 1)
Ezio Melotti2623a372010-11-21 13:34:58 +00001377 self.assertEqual(7, rw.tell())
1378 self.assertEqual(b"fl", rw.read(11))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001379 self.assertRaises(TypeError, rw.seek, 0.0)
1380
Antoine Pitrou19690592009-06-12 20:14:08 +00001381 def check_flush_and_read(self, read_func):
1382 raw = self.BytesIO(b"abcdefghi")
1383 bufio = self.tp(raw)
1384
Ezio Melotti2623a372010-11-21 13:34:58 +00001385 self.assertEqual(b"ab", read_func(bufio, 2))
Antoine Pitrou19690592009-06-12 20:14:08 +00001386 bufio.write(b"12")
Ezio Melotti2623a372010-11-21 13:34:58 +00001387 self.assertEqual(b"ef", read_func(bufio, 2))
1388 self.assertEqual(6, bufio.tell())
Antoine Pitrou19690592009-06-12 20:14:08 +00001389 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001390 self.assertEqual(6, bufio.tell())
1391 self.assertEqual(b"ghi", read_func(bufio))
Antoine Pitrou19690592009-06-12 20:14:08 +00001392 raw.seek(0, 0)
1393 raw.write(b"XYZ")
1394 # flush() resets the read buffer
1395 bufio.flush()
1396 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001397 self.assertEqual(b"XYZ", read_func(bufio, 3))
Antoine Pitrou19690592009-06-12 20:14:08 +00001398
1399 def test_flush_and_read(self):
1400 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1401
1402 def test_flush_and_readinto(self):
1403 def _readinto(bufio, n=-1):
1404 b = bytearray(n if n >= 0 else 9999)
1405 n = bufio.readinto(b)
1406 return bytes(b[:n])
1407 self.check_flush_and_read(_readinto)
1408
1409 def test_flush_and_peek(self):
1410 def _peek(bufio, n=-1):
1411 # This relies on the fact that the buffer can contain the whole
1412 # raw stream, otherwise peek() can return less.
1413 b = bufio.peek(n)
1414 if n != -1:
1415 b = b[:n]
1416 bufio.seek(len(b), 1)
1417 return b
1418 self.check_flush_and_read(_peek)
1419
1420 def test_flush_and_write(self):
1421 raw = self.BytesIO(b"abcdefghi")
1422 bufio = self.tp(raw)
1423
1424 bufio.write(b"123")
1425 bufio.flush()
1426 bufio.write(b"45")
1427 bufio.flush()
1428 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001429 self.assertEqual(b"12345fghi", raw.getvalue())
1430 self.assertEqual(b"12345fghi", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +00001431
1432 def test_threads(self):
1433 BufferedReaderTest.test_threads(self)
1434 BufferedWriterTest.test_threads(self)
1435
1436 def test_writes_and_peek(self):
1437 def _peek(bufio):
1438 bufio.peek(1)
1439 self.check_writes(_peek)
1440 def _peek(bufio):
1441 pos = bufio.tell()
1442 bufio.seek(-1, 1)
1443 bufio.peek(1)
1444 bufio.seek(pos, 0)
1445 self.check_writes(_peek)
1446
1447 def test_writes_and_reads(self):
1448 def _read(bufio):
1449 bufio.seek(-1, 1)
1450 bufio.read(1)
1451 self.check_writes(_read)
1452
1453 def test_writes_and_read1s(self):
1454 def _read1(bufio):
1455 bufio.seek(-1, 1)
1456 bufio.read1(1)
1457 self.check_writes(_read1)
1458
1459 def test_writes_and_readintos(self):
1460 def _read(bufio):
1461 bufio.seek(-1, 1)
1462 bufio.readinto(bytearray(1))
1463 self.check_writes(_read)
1464
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001465 def test_write_after_readahead(self):
1466 # Issue #6629: writing after the buffer was filled by readahead should
1467 # first rewind the raw stream.
1468 for overwrite_size in [1, 5]:
1469 raw = self.BytesIO(b"A" * 10)
1470 bufio = self.tp(raw, 4)
1471 # Trigger readahead
1472 self.assertEqual(bufio.read(1), b"A")
1473 self.assertEqual(bufio.tell(), 1)
1474 # Overwriting should rewind the raw stream if it needs so
1475 bufio.write(b"B" * overwrite_size)
1476 self.assertEqual(bufio.tell(), overwrite_size + 1)
1477 # If the write size was smaller than the buffer size, flush() and
1478 # check that rewind happens.
1479 bufio.flush()
1480 self.assertEqual(bufio.tell(), overwrite_size + 1)
1481 s = raw.getvalue()
1482 self.assertEqual(s,
1483 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1484
Antoine Pitrouee46a7b2011-05-13 00:31:52 +02001485 def test_write_rewind_write(self):
1486 # Various combinations of reading / writing / seeking backwards / writing again
1487 def mutate(bufio, pos1, pos2):
1488 assert pos2 >= pos1
1489 # Fill the buffer
1490 bufio.seek(pos1)
1491 bufio.read(pos2 - pos1)
1492 bufio.write(b'\x02')
1493 # This writes earlier than the previous write, but still inside
1494 # the buffer.
1495 bufio.seek(pos1)
1496 bufio.write(b'\x01')
1497
1498 b = b"\x80\x81\x82\x83\x84"
1499 for i in range(0, len(b)):
1500 for j in range(i, len(b)):
1501 raw = self.BytesIO(b)
1502 bufio = self.tp(raw, 100)
1503 mutate(bufio, i, j)
1504 bufio.flush()
1505 expected = bytearray(b)
1506 expected[j] = 2
1507 expected[i] = 1
1508 self.assertEqual(raw.getvalue(), expected,
1509 "failed result for i=%d, j=%d" % (i, j))
1510
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001511 def test_truncate_after_read_or_write(self):
1512 raw = self.BytesIO(b"A" * 10)
1513 bufio = self.tp(raw, 100)
1514 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1515 self.assertEqual(bufio.truncate(), 2)
1516 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1517 self.assertEqual(bufio.truncate(), 4)
1518
Antoine Pitrou19690592009-06-12 20:14:08 +00001519 def test_misbehaved_io(self):
1520 BufferedReaderTest.test_misbehaved_io(self)
1521 BufferedWriterTest.test_misbehaved_io(self)
1522
1523class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest):
1524 tp = io.BufferedRandom
1525
1526 def test_constructor(self):
1527 BufferedRandomTest.test_constructor(self)
1528 # The allocation can succeed on 32-bit builds, e.g. with more
1529 # than 2GB RAM and a 64-bit kernel.
1530 if sys.maxsize > 0x7FFFFFFF:
1531 rawio = self.MockRawIO()
1532 bufio = self.tp(rawio)
1533 self.assertRaises((OverflowError, MemoryError, ValueError),
1534 bufio.__init__, rawio, sys.maxsize)
1535
1536 def test_garbage_collection(self):
1537 CBufferedReaderTest.test_garbage_collection(self)
1538 CBufferedWriterTest.test_garbage_collection(self)
1539
1540class PyBufferedRandomTest(BufferedRandomTest):
1541 tp = pyio.BufferedRandom
1542
1543
Christian Heimes1a6387e2008-03-26 12:49:49 +00001544# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1545# properties:
1546# - A single output character can correspond to many bytes of input.
1547# - The number of input bytes to complete the character can be
1548# undetermined until the last input byte is received.
1549# - The number of input bytes can vary depending on previous input.
1550# - A single input byte can correspond to many characters of output.
1551# - The number of output characters can be undetermined until the
1552# last input byte is received.
1553# - The number of output characters can vary depending on previous input.
1554
1555class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1556 """
1557 For testing seek/tell behavior with a stateful, buffering decoder.
1558
1559 Input is a sequence of words. Words may be fixed-length (length set
1560 by input) or variable-length (period-terminated). In variable-length
1561 mode, extra periods are ignored. Possible words are:
1562 - 'i' followed by a number sets the input length, I (maximum 99).
1563 When I is set to 0, words are space-terminated.
1564 - 'o' followed by a number sets the output length, O (maximum 99).
1565 - Any other word is converted into a word followed by a period on
1566 the output. The output word consists of the input word truncated
1567 or padded out with hyphens to make its length equal to O. If O
1568 is 0, the word is output verbatim without truncating or padding.
1569 I and O are initially set to 1. When I changes, any buffered input is
1570 re-scanned according to the new I. EOF also terminates the last word.
1571 """
1572
1573 def __init__(self, errors='strict'):
1574 codecs.IncrementalDecoder.__init__(self, errors)
1575 self.reset()
1576
1577 def __repr__(self):
1578 return '<SID %x>' % id(self)
1579
1580 def reset(self):
1581 self.i = 1
1582 self.o = 1
1583 self.buffer = bytearray()
1584
1585 def getstate(self):
1586 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1587 return bytes(self.buffer), i*100 + o
1588
1589 def setstate(self, state):
1590 buffer, io = state
1591 self.buffer = bytearray(buffer)
1592 i, o = divmod(io, 100)
1593 self.i, self.o = i ^ 1, o ^ 1
1594
1595 def decode(self, input, final=False):
1596 output = ''
1597 for b in input:
1598 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001599 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001600 if self.buffer:
1601 output += self.process_word()
1602 else:
1603 self.buffer.append(b)
1604 else: # fixed-length, terminate after self.i bytes
1605 self.buffer.append(b)
1606 if len(self.buffer) == self.i:
1607 output += self.process_word()
1608 if final and self.buffer: # EOF terminates the last word
1609 output += self.process_word()
1610 return output
1611
1612 def process_word(self):
1613 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001614 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001615 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001616 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001617 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1618 else:
1619 output = self.buffer.decode('ascii')
1620 if len(output) < self.o:
1621 output += '-'*self.o # pad out with hyphens
1622 if self.o:
1623 output = output[:self.o] # truncate to output length
1624 output += '.'
1625 self.buffer = bytearray()
1626 return output
1627
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001628 codecEnabled = False
1629
1630 @classmethod
1631 def lookupTestDecoder(cls, name):
1632 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001633 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001634 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001635 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001636 incrementalencoder=None,
1637 streamreader=None, streamwriter=None,
1638 incrementaldecoder=cls)
1639
1640# Register the previous decoder for testing.
1641# Disabled by default, tests will enable it.
1642codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1643
1644
Christian Heimes1a6387e2008-03-26 12:49:49 +00001645class StatefulIncrementalDecoderTest(unittest.TestCase):
1646 """
1647 Make sure the StatefulIncrementalDecoder actually works.
1648 """
1649
1650 test_cases = [
1651 # I=1, O=1 (fixed-length input == fixed-length output)
1652 (b'abcd', False, 'a.b.c.d.'),
1653 # I=0, O=0 (variable-length input, variable-length output)
1654 (b'oiabcd', True, 'abcd.'),
1655 # I=0, O=0 (should ignore extra periods)
1656 (b'oi...abcd...', True, 'abcd.'),
1657 # I=0, O=6 (variable-length input, fixed-length output)
1658 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1659 # I=2, O=6 (fixed-length input < fixed-length output)
1660 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1661 # I=6, O=3 (fixed-length input > fixed-length output)
1662 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1663 # I=0, then 3; O=29, then 15 (with longer output)
1664 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1665 'a----------------------------.' +
1666 'b----------------------------.' +
1667 'cde--------------------------.' +
1668 'abcdefghijabcde.' +
1669 'a.b------------.' +
1670 '.c.------------.' +
1671 'd.e------------.' +
1672 'k--------------.' +
1673 'l--------------.' +
1674 'm--------------.')
1675 ]
1676
Antoine Pitrou19690592009-06-12 20:14:08 +00001677 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001678 # Try a few one-shot test cases.
1679 for input, eof, output in self.test_cases:
1680 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001681 self.assertEqual(d.decode(input, eof), output)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001682
1683 # Also test an unfinished decode, followed by forcing EOF.
1684 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001685 self.assertEqual(d.decode(b'oiabcd'), '')
1686 self.assertEqual(d.decode(b'', 1), 'abcd.')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001687
1688class TextIOWrapperTest(unittest.TestCase):
1689
1690 def setUp(self):
1691 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1692 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001693 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001694
1695 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001696 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001697
Antoine Pitrou19690592009-06-12 20:14:08 +00001698 def test_constructor(self):
1699 r = self.BytesIO(b"\xc3\xa9\n\n")
1700 b = self.BufferedReader(r, 1000)
1701 t = self.TextIOWrapper(b)
1702 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti2623a372010-11-21 13:34:58 +00001703 self.assertEqual(t.encoding, "latin1")
1704 self.assertEqual(t.line_buffering, False)
Antoine Pitrou19690592009-06-12 20:14:08 +00001705 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti2623a372010-11-21 13:34:58 +00001706 self.assertEqual(t.encoding, "utf8")
1707 self.assertEqual(t.line_buffering, True)
1708 self.assertEqual("\xe9\n", t.readline())
Antoine Pitrou19690592009-06-12 20:14:08 +00001709 self.assertRaises(TypeError, t.__init__, b, newline=42)
1710 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1711
1712 def test_detach(self):
1713 r = self.BytesIO()
1714 b = self.BufferedWriter(r)
1715 t = self.TextIOWrapper(b)
1716 self.assertIs(t.detach(), b)
1717
1718 t = self.TextIOWrapper(b, encoding="ascii")
1719 t.write("howdy")
1720 self.assertFalse(r.getvalue())
1721 t.detach()
1722 self.assertEqual(r.getvalue(), b"howdy")
1723 self.assertRaises(ValueError, t.detach)
1724
1725 def test_repr(self):
1726 raw = self.BytesIO("hello".encode("utf-8"))
1727 b = self.BufferedReader(raw)
1728 t = self.TextIOWrapper(b, encoding="utf-8")
1729 modname = self.TextIOWrapper.__module__
1730 self.assertEqual(repr(t),
1731 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1732 raw.name = "dummy"
1733 self.assertEqual(repr(t),
1734 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1735 raw.name = b"dummy"
1736 self.assertEqual(repr(t),
1737 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1738
1739 def test_line_buffering(self):
1740 r = self.BytesIO()
1741 b = self.BufferedWriter(r, 1000)
1742 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1743 t.write("X")
Ezio Melotti2623a372010-11-21 13:34:58 +00001744 self.assertEqual(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001745 t.write("Y\nZ")
Ezio Melotti2623a372010-11-21 13:34:58 +00001746 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001747 t.write("A\rB")
Ezio Melotti2623a372010-11-21 13:34:58 +00001748 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001749
Antoine Pitrou19690592009-06-12 20:14:08 +00001750 def test_encoding(self):
1751 # Check the encoding attribute is always set, and valid
1752 b = self.BytesIO()
1753 t = self.TextIOWrapper(b, encoding="utf8")
1754 self.assertEqual(t.encoding, "utf8")
1755 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001756 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00001757 codecs.lookup(t.encoding)
1758
1759 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001760 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001761 b = self.BytesIO(b"abc\n\xff\n")
1762 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001763 self.assertRaises(UnicodeError, t.read)
1764 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001765 b = self.BytesIO(b"abc\n\xff\n")
1766 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001767 self.assertRaises(UnicodeError, t.read)
1768 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001769 b = self.BytesIO(b"abc\n\xff\n")
1770 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti2623a372010-11-21 13:34:58 +00001771 self.assertEqual(t.read(), "abc\n\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001772 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001773 b = self.BytesIO(b"abc\n\xff\n")
1774 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti2623a372010-11-21 13:34:58 +00001775 self.assertEqual(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001776
Antoine Pitrou19690592009-06-12 20:14:08 +00001777 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001778 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001779 b = self.BytesIO()
1780 t = self.TextIOWrapper(b, encoding="ascii")
1781 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001782 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001783 b = self.BytesIO()
1784 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1785 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001786 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001787 b = self.BytesIO()
1788 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001789 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001790 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001791 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001792 self.assertEqual(b.getvalue(), b"abcdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001793 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001794 b = self.BytesIO()
1795 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001796 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001797 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001798 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001799 self.assertEqual(b.getvalue(), b"abc?def\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001800
Antoine Pitrou19690592009-06-12 20:14:08 +00001801 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001802 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1803
1804 tests = [
1805 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1806 [ '', input_lines ],
1807 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1808 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1809 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1810 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001811 encodings = (
1812 'utf-8', 'latin-1',
1813 'utf-16', 'utf-16-le', 'utf-16-be',
1814 'utf-32', 'utf-32-le', 'utf-32-be',
1815 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00001816
1817 # Try a range of buffer sizes to test the case where \r is the last
1818 # character in TextIOWrapper._pending_line.
1819 for encoding in encodings:
1820 # XXX: str.encode() should return bytes
1821 data = bytes(''.join(input_lines).encode(encoding))
1822 for do_reads in (False, True):
1823 for bufsize in range(1, 10):
1824 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001825 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1826 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00001827 encoding=encoding)
1828 if do_reads:
1829 got_lines = []
1830 while True:
1831 c2 = textio.read(2)
1832 if c2 == '':
1833 break
Ezio Melotti2623a372010-11-21 13:34:58 +00001834 self.assertEqual(len(c2), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001835 got_lines.append(c2 + textio.readline())
1836 else:
1837 got_lines = list(textio)
1838
1839 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti2623a372010-11-21 13:34:58 +00001840 self.assertEqual(got_line, exp_line)
1841 self.assertEqual(len(got_lines), len(exp_lines))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001842
Antoine Pitrou19690592009-06-12 20:14:08 +00001843 def test_newlines_input(self):
1844 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001845 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1846 for newline, expected in [
1847 (None, normalized.decode("ascii").splitlines(True)),
1848 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00001849 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1850 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1851 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00001852 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00001853 buf = self.BytesIO(testdata)
1854 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti2623a372010-11-21 13:34:58 +00001855 self.assertEqual(txt.readlines(), expected)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001856 txt.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001857 self.assertEqual(txt.read(), "".join(expected))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001858
Antoine Pitrou19690592009-06-12 20:14:08 +00001859 def test_newlines_output(self):
1860 testdict = {
1861 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1862 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1863 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1864 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1865 }
1866 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1867 for newline, expected in tests:
1868 buf = self.BytesIO()
1869 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1870 txt.write("AAA\nB")
1871 txt.write("BB\nCCC\n")
1872 txt.write("X\rY\r\nZ")
1873 txt.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001874 self.assertEqual(buf.closed, False)
1875 self.assertEqual(buf.getvalue(), expected)
Antoine Pitrou19690592009-06-12 20:14:08 +00001876
1877 def test_destructor(self):
1878 l = []
1879 base = self.BytesIO
1880 class MyBytesIO(base):
1881 def close(self):
1882 l.append(self.getvalue())
1883 base.close(self)
1884 b = MyBytesIO()
1885 t = self.TextIOWrapper(b, encoding="ascii")
1886 t.write("abc")
1887 del t
1888 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001889 self.assertEqual([b"abc"], l)
Antoine Pitrou19690592009-06-12 20:14:08 +00001890
1891 def test_override_destructor(self):
1892 record = []
1893 class MyTextIO(self.TextIOWrapper):
1894 def __del__(self):
1895 record.append(1)
1896 try:
1897 f = super(MyTextIO, self).__del__
1898 except AttributeError:
1899 pass
1900 else:
1901 f()
1902 def close(self):
1903 record.append(2)
1904 super(MyTextIO, self).close()
1905 def flush(self):
1906 record.append(3)
1907 super(MyTextIO, self).flush()
1908 b = self.BytesIO()
1909 t = MyTextIO(b, encoding="ascii")
1910 del t
1911 support.gc_collect()
1912 self.assertEqual(record, [1, 2, 3])
1913
1914 def test_error_through_destructor(self):
1915 # Test that the exception state is not modified by a destructor,
1916 # even if close() fails.
1917 rawio = self.CloseFailureIO()
1918 def f():
1919 self.TextIOWrapper(rawio).xyzzy
1920 with support.captured_output("stderr") as s:
1921 self.assertRaises(AttributeError, f)
1922 s = s.getvalue().strip()
1923 if s:
1924 # The destructor *may* have printed an unraisable error, check it
1925 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001926 self.assertTrue(s.startswith("Exception IOError: "), s)
1927 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001928
1929 # Systematic tests of the text I/O API
1930
Antoine Pitrou19690592009-06-12 20:14:08 +00001931 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001932 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1933 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00001934 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001935 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00001936 self.assertEqual(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001937 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001938 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001939 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00001940 self.assertEqual(f.tell(), 0)
1941 self.assertEqual(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001942 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00001943 self.assertEqual(f.seek(0), 0)
1944 self.assertEqual(f.read(None), "abc")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001945 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001946 self.assertEqual(f.read(2), "ab")
1947 self.assertEqual(f.read(1), "c")
1948 self.assertEqual(f.read(1), "")
1949 self.assertEqual(f.read(), "")
1950 self.assertEqual(f.tell(), cookie)
1951 self.assertEqual(f.seek(0), 0)
1952 self.assertEqual(f.seek(0, 2), cookie)
1953 self.assertEqual(f.write("def"), 3)
1954 self.assertEqual(f.seek(cookie), cookie)
1955 self.assertEqual(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001956 if enc.startswith("utf"):
1957 self.multi_line_test(f, enc)
1958 f.close()
1959
1960 def multi_line_test(self, f, enc):
1961 f.seek(0)
1962 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00001963 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001964 wlines = []
1965 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
1966 chars = []
1967 for i in range(size):
1968 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00001969 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001970 wlines.append((f.tell(), line))
1971 f.write(line)
1972 f.seek(0)
1973 rlines = []
1974 while True:
1975 pos = f.tell()
1976 line = f.readline()
1977 if not line:
1978 break
1979 rlines.append((pos, line))
Ezio Melotti2623a372010-11-21 13:34:58 +00001980 self.assertEqual(rlines, wlines)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001981
Antoine Pitrou19690592009-06-12 20:14:08 +00001982 def test_telling(self):
1983 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001984 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001985 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001986 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001987 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001988 p2 = f.tell()
1989 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001990 self.assertEqual(f.tell(), p0)
1991 self.assertEqual(f.readline(), "\xff\n")
1992 self.assertEqual(f.tell(), p1)
1993 self.assertEqual(f.readline(), "\xff\n")
1994 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001995 f.seek(0)
1996 for line in f:
Ezio Melotti2623a372010-11-21 13:34:58 +00001997 self.assertEqual(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001998 self.assertRaises(IOError, f.tell)
Ezio Melotti2623a372010-11-21 13:34:58 +00001999 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002000 f.close()
2001
Antoine Pitrou19690592009-06-12 20:14:08 +00002002 def test_seeking(self):
2003 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00002004 prefix_size = chunk_size - 2
2005 u_prefix = "a" * prefix_size
2006 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti2623a372010-11-21 13:34:58 +00002007 self.assertEqual(len(u_prefix), len(prefix))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002008 u_suffix = "\u8888\n"
2009 suffix = bytes(u_suffix.encode("utf-8"))
2010 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00002011 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002012 f.write(line*2)
2013 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002014 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002015 s = f.read(prefix_size)
Ezio Melotti2623a372010-11-21 13:34:58 +00002016 self.assertEqual(s, prefix.decode("ascii"))
2017 self.assertEqual(f.tell(), prefix_size)
2018 self.assertEqual(f.readline(), u_suffix)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002019
Antoine Pitrou19690592009-06-12 20:14:08 +00002020 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002021 # Regression test for a specific bug
2022 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00002023 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002024 f.write(data)
2025 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002026 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002027 f._CHUNK_SIZE # Just test that it exists
2028 f._CHUNK_SIZE = 2
2029 f.readline()
2030 f.tell()
2031
Antoine Pitrou19690592009-06-12 20:14:08 +00002032 def test_seek_and_tell(self):
2033 #Test seek/tell using the StatefulIncrementalDecoder.
2034 # Make test faster by doing smaller seeks
2035 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00002036
Antoine Pitrou19690592009-06-12 20:14:08 +00002037 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002038 """Tell/seek to various points within a data stream and ensure
2039 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00002040 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002041 f.write(data)
2042 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002043 f = self.open(support.TESTFN, encoding='test_decoder')
2044 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00002045 decoded = f.read()
2046 f.close()
2047
2048 for i in range(min_pos, len(decoded) + 1): # seek positions
2049 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00002050 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti2623a372010-11-21 13:34:58 +00002051 self.assertEqual(f.read(i), decoded[:i])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002052 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002053 self.assertEqual(f.read(j), decoded[i:i + j])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002054 f.seek(cookie)
Ezio Melotti2623a372010-11-21 13:34:58 +00002055 self.assertEqual(f.read(), decoded[i:])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002056 f.close()
2057
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002058 # Enable the test decoder.
2059 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002060
2061 # Run the tests.
2062 try:
2063 # Try each test case.
2064 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002065 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002066
2067 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002068 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2069 offset = CHUNK_SIZE - len(input)//2
2070 prefix = b'.'*offset
2071 # Don't bother seeking into the prefix (takes too long).
2072 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002073 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002074
2075 # Ensure our test decoder won't interfere with subsequent tests.
2076 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002077 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002078
Antoine Pitrou19690592009-06-12 20:14:08 +00002079 def test_encoded_writes(self):
2080 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002081 tests = ("utf-16",
2082 "utf-16-le",
2083 "utf-16-be",
2084 "utf-32",
2085 "utf-32-le",
2086 "utf-32-be")
2087 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002088 buf = self.BytesIO()
2089 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002090 # Check if the BOM is written only once (see issue1753).
2091 f.write(data)
2092 f.write(data)
2093 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002094 self.assertEqual(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002095 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002096 self.assertEqual(f.read(), data * 2)
2097 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002098
Antoine Pitrou19690592009-06-12 20:14:08 +00002099 def test_unreadable(self):
2100 class UnReadable(self.BytesIO):
2101 def readable(self):
2102 return False
2103 txt = self.TextIOWrapper(UnReadable())
2104 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002105
Antoine Pitrou19690592009-06-12 20:14:08 +00002106 def test_read_one_by_one(self):
2107 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002108 reads = ""
2109 while True:
2110 c = txt.read(1)
2111 if not c:
2112 break
2113 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002114 self.assertEqual(reads, "AA\nBB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002115
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002116 def test_readlines(self):
2117 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2118 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2119 txt.seek(0)
2120 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2121 txt.seek(0)
2122 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2123
Christian Heimes1a6387e2008-03-26 12:49:49 +00002124 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002125 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002126 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002127 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002128 reads = ""
2129 while True:
2130 c = txt.read(128)
2131 if not c:
2132 break
2133 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002134 self.assertEqual(reads, "A"*127+"\nB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002135
2136 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002137 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002138
2139 # read one char at a time
2140 reads = ""
2141 while True:
2142 c = txt.read(1)
2143 if not c:
2144 break
2145 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002146 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002147
2148 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002149 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002150 txt._CHUNK_SIZE = 4
2151
2152 reads = ""
2153 while True:
2154 c = txt.read(4)
2155 if not c:
2156 break
2157 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002158 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002159
2160 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002161 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002162 txt._CHUNK_SIZE = 4
2163
2164 reads = txt.read(4)
2165 reads += txt.read(4)
2166 reads += txt.readline()
2167 reads += txt.readline()
2168 reads += txt.readline()
Ezio Melotti2623a372010-11-21 13:34:58 +00002169 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002170
2171 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002172 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002173 txt._CHUNK_SIZE = 4
2174
2175 reads = txt.read(4)
2176 reads += txt.read()
Ezio Melotti2623a372010-11-21 13:34:58 +00002177 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002178
2179 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002180 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002181 txt._CHUNK_SIZE = 4
2182
2183 reads = txt.read(4)
2184 pos = txt.tell()
2185 txt.seek(0)
2186 txt.seek(pos)
Ezio Melotti2623a372010-11-21 13:34:58 +00002187 self.assertEqual(txt.read(4), "BBB\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002188
2189 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002190 buffer = self.BytesIO(self.testdata)
2191 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002192
2193 self.assertEqual(buffer.seekable(), txt.seekable())
2194
Antoine Pitrou19690592009-06-12 20:14:08 +00002195 def test_append_bom(self):
2196 # The BOM is not written again when appending to a non-empty file
2197 filename = support.TESTFN
2198 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2199 with self.open(filename, 'w', encoding=charset) as f:
2200 f.write('aaa')
2201 pos = f.tell()
2202 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002203 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002204
2205 with self.open(filename, 'a', encoding=charset) as f:
2206 f.write('xxx')
2207 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002208 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002209
Antoine Pitrou19690592009-06-12 20:14:08 +00002210 def test_seek_bom(self):
2211 # Same test, but when seeking manually
2212 filename = support.TESTFN
2213 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2214 with self.open(filename, 'w', encoding=charset) as f:
2215 f.write('aaa')
2216 pos = f.tell()
2217 with self.open(filename, 'r+', encoding=charset) as f:
2218 f.seek(pos)
2219 f.write('zzz')
2220 f.seek(0)
2221 f.write('bbb')
2222 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002223 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002224
2225 def test_errors_property(self):
2226 with self.open(support.TESTFN, "w") as f:
2227 self.assertEqual(f.errors, "strict")
2228 with self.open(support.TESTFN, "w", errors="replace") as f:
2229 self.assertEqual(f.errors, "replace")
2230
Victor Stinner6a102812010-04-27 23:55:59 +00002231 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002232 def test_threads_write(self):
2233 # Issue6750: concurrent writes could duplicate data
2234 event = threading.Event()
2235 with self.open(support.TESTFN, "w", buffering=1) as f:
2236 def run(n):
2237 text = "Thread%03d\n" % n
2238 event.wait()
2239 f.write(text)
2240 threads = [threading.Thread(target=lambda n=x: run(n))
2241 for x in range(20)]
2242 for t in threads:
2243 t.start()
2244 time.sleep(0.02)
2245 event.set()
2246 for t in threads:
2247 t.join()
2248 with self.open(support.TESTFN) as f:
2249 content = f.read()
2250 for n in range(20):
Ezio Melotti2623a372010-11-21 13:34:58 +00002251 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002252
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002253 def test_flush_error_on_close(self):
2254 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2255 def bad_flush():
2256 raise IOError()
2257 txt.flush = bad_flush
2258 self.assertRaises(IOError, txt.close) # exception not swallowed
2259
2260 def test_multi_close(self):
2261 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2262 txt.close()
2263 txt.close()
2264 txt.close()
2265 self.assertRaises(ValueError, txt.flush)
2266
Antoine Pitroufc9ead62010-12-21 21:26:55 +00002267 def test_readonly_attributes(self):
2268 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2269 buf = self.BytesIO(self.testdata)
2270 with self.assertRaises((AttributeError, TypeError)):
2271 txt.buffer = buf
2272
Antoine Pitrou19690592009-06-12 20:14:08 +00002273class CTextIOWrapperTest(TextIOWrapperTest):
2274
2275 def test_initialization(self):
2276 r = self.BytesIO(b"\xc3\xa9\n\n")
2277 b = self.BufferedReader(r, 1000)
2278 t = self.TextIOWrapper(b)
2279 self.assertRaises(TypeError, t.__init__, b, newline=42)
2280 self.assertRaises(ValueError, t.read)
2281 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2282 self.assertRaises(ValueError, t.read)
2283
2284 def test_garbage_collection(self):
2285 # C TextIOWrapper objects are collected, and collecting them flushes
2286 # all data to disk.
2287 # The Python version has __del__, so it ends in gc.garbage instead.
2288 rawio = io.FileIO(support.TESTFN, "wb")
2289 b = self.BufferedWriter(rawio)
2290 t = self.TextIOWrapper(b, encoding="ascii")
2291 t.write("456def")
2292 t.x = t
2293 wr = weakref.ref(t)
2294 del t
2295 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002296 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002297 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002298 self.assertEqual(f.read(), b"456def")
2299
2300class PyTextIOWrapperTest(TextIOWrapperTest):
2301 pass
2302
2303
2304class IncrementalNewlineDecoderTest(unittest.TestCase):
2305
2306 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002307 # UTF-8 specific tests for a newline decoder
2308 def _check_decode(b, s, **kwargs):
2309 # We exercise getstate() / setstate() as well as decode()
2310 state = decoder.getstate()
Ezio Melotti2623a372010-11-21 13:34:58 +00002311 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002312 decoder.setstate(state)
Ezio Melotti2623a372010-11-21 13:34:58 +00002313 self.assertEqual(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002314
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002315 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002316
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002317 _check_decode(b'\xe8', "")
2318 _check_decode(b'\xa2', "")
2319 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002320
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002321 _check_decode(b'\xe8', "")
2322 _check_decode(b'\xa2', "")
2323 _check_decode(b'\x88', "\u8888")
2324
2325 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002326 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2327
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002328 decoder.reset()
2329 _check_decode(b'\n', "\n")
2330 _check_decode(b'\r', "")
2331 _check_decode(b'', "\n", final=True)
2332 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002333
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002334 _check_decode(b'\r', "")
2335 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002336
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002337 _check_decode(b'\r\r\n', "\n\n")
2338 _check_decode(b'\r', "")
2339 _check_decode(b'\r', "\n")
2340 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002341
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002342 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2343 _check_decode(b'\xe8\xa2\x88', "\u8888")
2344 _check_decode(b'\n', "\n")
2345 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2346 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002347
Antoine Pitrou19690592009-06-12 20:14:08 +00002348 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002349 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002350 if encoding is not None:
2351 encoder = codecs.getincrementalencoder(encoding)()
2352 def _decode_bytewise(s):
2353 # Decode one byte at a time
2354 for b in encoder.encode(s):
2355 result.append(decoder.decode(b))
2356 else:
2357 encoder = None
2358 def _decode_bytewise(s):
2359 # Decode one char at a time
2360 for c in s:
2361 result.append(decoder.decode(c))
Ezio Melotti2623a372010-11-21 13:34:58 +00002362 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002363 _decode_bytewise("abc\n\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002364 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002365 _decode_bytewise("\nabc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002366 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002367 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002368 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002369 _decode_bytewise("abc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002370 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002371 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002372 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002373 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002374 input = "abc"
2375 if encoder is not None:
2376 encoder.reset()
2377 input = encoder.encode(input)
Ezio Melotti2623a372010-11-21 13:34:58 +00002378 self.assertEqual(decoder.decode(input), "abc")
2379 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002380
2381 def test_newline_decoder(self):
2382 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002383 # None meaning the IncrementalNewlineDecoder takes unicode input
2384 # rather than bytes input
2385 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002386 'utf-16', 'utf-16-le', 'utf-16-be',
2387 'utf-32', 'utf-32-le', 'utf-32-be',
2388 )
2389 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002390 decoder = enc and codecs.getincrementaldecoder(enc)()
2391 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2392 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002393 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002394 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2395 self.check_newline_decoding_utf8(decoder)
2396
2397 def test_newline_bytes(self):
2398 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2399 def _check(dec):
Ezio Melotti2623a372010-11-21 13:34:58 +00002400 self.assertEqual(dec.newlines, None)
2401 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2402 self.assertEqual(dec.newlines, None)
2403 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2404 self.assertEqual(dec.newlines, None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002405 dec = self.IncrementalNewlineDecoder(None, translate=False)
2406 _check(dec)
2407 dec = self.IncrementalNewlineDecoder(None, translate=True)
2408 _check(dec)
2409
2410class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2411 pass
2412
2413class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2414 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002415
Christian Heimes1a6387e2008-03-26 12:49:49 +00002416
2417# XXX Tests for open()
2418
2419class MiscIOTest(unittest.TestCase):
2420
Benjamin Petersonad100c32008-11-20 22:06:22 +00002421 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002422 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002423
Antoine Pitrou19690592009-06-12 20:14:08 +00002424 def test___all__(self):
2425 for name in self.io.__all__:
2426 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002427 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002428 if name == "open":
2429 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002430 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002431 self.assertTrue(issubclass(obj, Exception), name)
2432 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002433 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002434
Benjamin Petersonad100c32008-11-20 22:06:22 +00002435 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002436 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002437 self.assertEqual(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002438 f.close()
2439
Antoine Pitrou19690592009-06-12 20:14:08 +00002440 f = self.open(support.TESTFN, "U")
Ezio Melotti2623a372010-11-21 13:34:58 +00002441 self.assertEqual(f.name, support.TESTFN)
2442 self.assertEqual(f.buffer.name, support.TESTFN)
2443 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2444 self.assertEqual(f.mode, "U")
2445 self.assertEqual(f.buffer.mode, "rb")
2446 self.assertEqual(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002447 f.close()
2448
Antoine Pitrou19690592009-06-12 20:14:08 +00002449 f = self.open(support.TESTFN, "w+")
Ezio Melotti2623a372010-11-21 13:34:58 +00002450 self.assertEqual(f.mode, "w+")
2451 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2452 self.assertEqual(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002453
Antoine Pitrou19690592009-06-12 20:14:08 +00002454 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti2623a372010-11-21 13:34:58 +00002455 self.assertEqual(g.mode, "wb")
2456 self.assertEqual(g.raw.mode, "wb")
2457 self.assertEqual(g.name, f.fileno())
2458 self.assertEqual(g.raw.name, f.fileno())
Benjamin Petersonad100c32008-11-20 22:06:22 +00002459 f.close()
2460 g.close()
2461
Antoine Pitrou19690592009-06-12 20:14:08 +00002462 def test_io_after_close(self):
2463 for kwargs in [
2464 {"mode": "w"},
2465 {"mode": "wb"},
2466 {"mode": "w", "buffering": 1},
2467 {"mode": "w", "buffering": 2},
2468 {"mode": "wb", "buffering": 0},
2469 {"mode": "r"},
2470 {"mode": "rb"},
2471 {"mode": "r", "buffering": 1},
2472 {"mode": "r", "buffering": 2},
2473 {"mode": "rb", "buffering": 0},
2474 {"mode": "w+"},
2475 {"mode": "w+b"},
2476 {"mode": "w+", "buffering": 1},
2477 {"mode": "w+", "buffering": 2},
2478 {"mode": "w+b", "buffering": 0},
2479 ]:
2480 f = self.open(support.TESTFN, **kwargs)
2481 f.close()
2482 self.assertRaises(ValueError, f.flush)
2483 self.assertRaises(ValueError, f.fileno)
2484 self.assertRaises(ValueError, f.isatty)
2485 self.assertRaises(ValueError, f.__iter__)
2486 if hasattr(f, "peek"):
2487 self.assertRaises(ValueError, f.peek, 1)
2488 self.assertRaises(ValueError, f.read)
2489 if hasattr(f, "read1"):
2490 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinner5100a402011-05-25 22:15:36 +02002491 if hasattr(f, "readall"):
2492 self.assertRaises(ValueError, f.readall)
Antoine Pitrou19690592009-06-12 20:14:08 +00002493 if hasattr(f, "readinto"):
2494 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2495 self.assertRaises(ValueError, f.readline)
2496 self.assertRaises(ValueError, f.readlines)
2497 self.assertRaises(ValueError, f.seek, 0)
2498 self.assertRaises(ValueError, f.tell)
2499 self.assertRaises(ValueError, f.truncate)
2500 self.assertRaises(ValueError, f.write,
2501 b"" if "b" in kwargs['mode'] else "")
2502 self.assertRaises(ValueError, f.writelines, [])
2503 self.assertRaises(ValueError, next, f)
2504
2505 def test_blockingioerror(self):
2506 # Various BlockingIOError issues
2507 self.assertRaises(TypeError, self.BlockingIOError)
2508 self.assertRaises(TypeError, self.BlockingIOError, 1)
2509 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2510 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2511 b = self.BlockingIOError(1, "")
2512 self.assertEqual(b.characters_written, 0)
2513 class C(unicode):
2514 pass
2515 c = C("")
2516 b = self.BlockingIOError(1, c)
2517 c.b = b
2518 b.c = c
2519 wr = weakref.ref(c)
2520 del c, b
2521 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002522 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002523
2524 def test_abcs(self):
2525 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002526 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2527 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2528 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2529 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002530
2531 def _check_abc_inheritance(self, abcmodule):
2532 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002533 self.assertIsInstance(f, abcmodule.IOBase)
2534 self.assertIsInstance(f, abcmodule.RawIOBase)
2535 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2536 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002537 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002538 self.assertIsInstance(f, abcmodule.IOBase)
2539 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2540 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2541 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002542 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002543 self.assertIsInstance(f, abcmodule.IOBase)
2544 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2545 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2546 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002547
2548 def test_abc_inheritance(self):
2549 # Test implementations inherit from their respective ABCs
2550 self._check_abc_inheritance(self)
2551
2552 def test_abc_inheritance_official(self):
2553 # Test implementations inherit from the official ABCs of the
2554 # baseline "io" module.
2555 self._check_abc_inheritance(io)
2556
2557class CMiscIOTest(MiscIOTest):
2558 io = io
2559
2560class PyMiscIOTest(MiscIOTest):
2561 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002562
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002563
2564@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2565class SignalsTest(unittest.TestCase):
2566
2567 def setUp(self):
2568 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2569
2570 def tearDown(self):
2571 signal.signal(signal.SIGALRM, self.oldalrm)
2572
2573 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00002574 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002575
2576 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinner49d495f2011-07-04 11:44:46 +02002577 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
2578 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002579 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2580 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou6439c002011-02-25 21:35:47 +00002581 invokes the signal handler, and bubbles up the exception raised
2582 in the latter."""
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002583 read_results = []
2584 def _read():
2585 s = os.read(r, 1)
2586 read_results.append(s)
2587 t = threading.Thread(target=_read)
2588 t.daemon = True
2589 r, w = os.pipe()
2590 try:
2591 wio = self.io.open(w, **fdopen_kwargs)
2592 t.start()
2593 signal.alarm(1)
2594 # Fill the pipe enough that the write will be blocking.
2595 # It will be interrupted by the timer armed above. Since the
2596 # other thread has read one byte, the low-level write will
2597 # return with a successful (partial) result rather than an EINTR.
2598 # The buffered IO layer must check for pending signal
2599 # handlers, which in this case will invoke alarm_interrupt().
2600 self.assertRaises(ZeroDivisionError,
2601 wio.write, item * (1024 * 1024))
2602 t.join()
2603 # We got one byte, get another one and check that it isn't a
2604 # repeat of the first one.
2605 read_results.append(os.read(r, 1))
2606 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2607 finally:
2608 os.close(w)
2609 os.close(r)
2610 # This is deliberate. If we didn't close the file descriptor
2611 # before closing wio, wio would try to flush its internal
2612 # buffer, and block again.
2613 try:
2614 wio.close()
2615 except IOError as e:
2616 if e.errno != errno.EBADF:
2617 raise
2618
2619 def test_interrupted_write_unbuffered(self):
2620 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2621
2622 def test_interrupted_write_buffered(self):
2623 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2624
2625 def test_interrupted_write_text(self):
2626 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2627
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002628 def check_reentrant_write(self, data, **fdopen_kwargs):
2629 def on_alarm(*args):
2630 # Will be called reentrantly from the same thread
2631 wio.write(data)
Victor Stinner4c41f842011-07-05 13:29:26 +02002632 1//0
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002633 signal.signal(signal.SIGALRM, on_alarm)
2634 r, w = os.pipe()
2635 wio = self.io.open(w, **fdopen_kwargs)
2636 try:
2637 signal.alarm(1)
2638 # Either the reentrant call to wio.write() fails with RuntimeError,
2639 # or the signal handler raises ZeroDivisionError.
2640 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2641 while 1:
2642 for i in range(100):
2643 wio.write(data)
2644 wio.flush()
2645 # Make sure the buffer doesn't fill up and block further writes
2646 os.read(r, len(data) * 100)
2647 exc = cm.exception
2648 if isinstance(exc, RuntimeError):
2649 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2650 finally:
2651 wio.close()
2652 os.close(r)
2653
2654 def test_reentrant_write_buffered(self):
2655 self.check_reentrant_write(b"xy", mode="wb")
2656
2657 def test_reentrant_write_text(self):
2658 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2659
Antoine Pitrou6439c002011-02-25 21:35:47 +00002660 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2661 """Check that a buffered read, when it gets interrupted (either
2662 returning a partial result or EINTR), properly invokes the signal
2663 handler and retries if the latter returned successfully."""
2664 r, w = os.pipe()
2665 fdopen_kwargs["closefd"] = False
2666 def alarm_handler(sig, frame):
2667 os.write(w, b"bar")
2668 signal.signal(signal.SIGALRM, alarm_handler)
2669 try:
2670 rio = self.io.open(r, **fdopen_kwargs)
2671 os.write(w, b"foo")
2672 signal.alarm(1)
2673 # Expected behaviour:
2674 # - first raw read() returns partial b"foo"
2675 # - second raw read() returns EINTR
2676 # - third raw read() returns b"bar"
2677 self.assertEqual(decode(rio.read(6)), "foobar")
2678 finally:
2679 rio.close()
2680 os.close(w)
2681 os.close(r)
2682
2683 def test_interrupterd_read_retry_buffered(self):
2684 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2685 mode="rb")
2686
2687 def test_interrupterd_read_retry_text(self):
2688 self.check_interrupted_read_retry(lambda x: x,
2689 mode="r")
2690
2691 @unittest.skipUnless(threading, 'Threading required for this test.')
2692 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2693 """Check that a buffered write, when it gets interrupted (either
2694 returning a partial result or EINTR), properly invokes the signal
2695 handler and retries if the latter returned successfully."""
2696 select = support.import_module("select")
2697 # A quantity that exceeds the buffer size of an anonymous pipe's
2698 # write end.
2699 N = 1024 * 1024
2700 r, w = os.pipe()
2701 fdopen_kwargs["closefd"] = False
2702 # We need a separate thread to read from the pipe and allow the
2703 # write() to finish. This thread is started after the SIGALRM is
2704 # received (forcing a first EINTR in write()).
2705 read_results = []
2706 write_finished = False
2707 def _read():
2708 while not write_finished:
2709 while r in select.select([r], [], [], 1.0)[0]:
2710 s = os.read(r, 1024)
2711 read_results.append(s)
2712 t = threading.Thread(target=_read)
2713 t.daemon = True
2714 def alarm1(sig, frame):
2715 signal.signal(signal.SIGALRM, alarm2)
2716 signal.alarm(1)
2717 def alarm2(sig, frame):
2718 t.start()
2719 signal.signal(signal.SIGALRM, alarm1)
2720 try:
2721 wio = self.io.open(w, **fdopen_kwargs)
2722 signal.alarm(1)
2723 # Expected behaviour:
2724 # - first raw write() is partial (because of the limited pipe buffer
2725 # and the first alarm)
2726 # - second raw write() returns EINTR (because of the second alarm)
2727 # - subsequent write()s are successful (either partial or complete)
2728 self.assertEqual(N, wio.write(item * N))
2729 wio.flush()
2730 write_finished = True
2731 t.join()
2732 self.assertEqual(N, sum(len(x) for x in read_results))
2733 finally:
2734 write_finished = True
2735 os.close(w)
2736 os.close(r)
2737 # This is deliberate. If we didn't close the file descriptor
2738 # before closing wio, wio would try to flush its internal
2739 # buffer, and could block (in case of failure).
2740 try:
2741 wio.close()
2742 except IOError as e:
2743 if e.errno != errno.EBADF:
2744 raise
2745
2746 def test_interrupterd_write_retry_buffered(self):
2747 self.check_interrupted_write_retry(b"x", mode="wb")
2748
2749 def test_interrupterd_write_retry_text(self):
2750 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
2751
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002752
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002753class CSignalsTest(SignalsTest):
2754 io = io
2755
2756class PySignalsTest(SignalsTest):
2757 io = pyio
2758
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002759 # Handling reentrancy issues would slow down _pyio even more, so the
2760 # tests are disabled.
2761 test_reentrant_write_buffered = None
2762 test_reentrant_write_text = None
2763
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002764
Christian Heimes1a6387e2008-03-26 12:49:49 +00002765def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00002766 tests = (CIOTest, PyIOTest,
2767 CBufferedReaderTest, PyBufferedReaderTest,
2768 CBufferedWriterTest, PyBufferedWriterTest,
2769 CBufferedRWPairTest, PyBufferedRWPairTest,
2770 CBufferedRandomTest, PyBufferedRandomTest,
2771 StatefulIncrementalDecoderTest,
2772 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2773 CTextIOWrapperTest, PyTextIOWrapperTest,
2774 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002775 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00002776 )
2777
2778 # Put the namespaces of the IO module we are testing and some useful mock
2779 # classes in the __dict__ of each test.
2780 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00002781 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00002782 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2783 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
2784 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
2785 globs = globals()
2786 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2787 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2788 # Avoid turning open into a bound method.
2789 py_io_ns["open"] = pyio.OpenWrapper
2790 for test in tests:
2791 if test.__name__.startswith("C"):
2792 for name, obj in c_io_ns.items():
2793 setattr(test, name, obj)
2794 elif test.__name__.startswith("Py"):
2795 for name, obj in py_io_ns.items():
2796 setattr(test, name, obj)
2797
2798 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002799
2800if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00002801 test_main()