blob: 0bad56debac33a28543be3a5ef49876bc9b2e442 [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Antoine Pitrou19690592009-06-12 20:14:08 +000032import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000033import signal
34import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000035from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000036from collections import deque
37from test import test_support as support
Christian Heimes1a6387e2008-03-26 12:49:49 +000038
39import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000040import io # C implementation of io
41import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000042try:
43 import threading
44except ImportError:
45 threading = None
Antoine Pitrou19690592009-06-12 20:14:08 +000046
47__metaclass__ = type
48bytes = support.py3k_bytes
49
50def _default_chunk_size():
51 """Get the default TextIOWrapper chunk size"""
52 with io.open(__file__, "r", encoding="latin1") as f:
53 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000054
55
Antoine Pitrou6391b342010-09-14 18:48:19 +000056class MockRawIOWithoutRead:
57 """A RawIO implementation without read(), so as to exercise the default
58 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000059
60 def __init__(self, read_stack=()):
61 self._read_stack = list(read_stack)
62 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000063 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000064 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000065
Christian Heimes1a6387e2008-03-26 12:49:49 +000066 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000067 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000068 return len(b)
69
70 def writable(self):
71 return True
72
73 def fileno(self):
74 return 42
75
76 def readable(self):
77 return True
78
79 def seekable(self):
80 return True
81
82 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000083 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000084
85 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000086 return 0 # same comment as above
87
88 def readinto(self, buf):
89 self._reads += 1
90 max_len = len(buf)
91 try:
92 data = self._read_stack[0]
93 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000094 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +000095 return 0
96 if data is None:
97 del self._read_stack[0]
98 return None
99 n = len(data)
100 if len(data) <= max_len:
101 del self._read_stack[0]
102 buf[:n] = data
103 return n
104 else:
105 buf[:] = data[:max_len]
106 self._read_stack[0] = data[max_len:]
107 return max_len
108
109 def truncate(self, pos=None):
110 return pos
111
Antoine Pitrou6391b342010-09-14 18:48:19 +0000112class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
113 pass
114
115class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
116 pass
117
118
119class MockRawIO(MockRawIOWithoutRead):
120
121 def read(self, n=None):
122 self._reads += 1
123 try:
124 return self._read_stack.pop(0)
125 except:
126 self._extraneous_reads += 1
127 return b""
128
Antoine Pitrou19690592009-06-12 20:14:08 +0000129class CMockRawIO(MockRawIO, io.RawIOBase):
130 pass
131
132class PyMockRawIO(MockRawIO, pyio.RawIOBase):
133 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000134
135
Antoine Pitrou19690592009-06-12 20:14:08 +0000136class MisbehavedRawIO(MockRawIO):
137 def write(self, b):
138 return MockRawIO.write(self, b) * 2
139
140 def read(self, n=None):
141 return MockRawIO.read(self, n) * 2
142
143 def seek(self, pos, whence):
144 return -123
145
146 def tell(self):
147 return -456
148
149 def readinto(self, buf):
150 MockRawIO.readinto(self, buf)
151 return len(buf) * 5
152
153class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
154 pass
155
156class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
157 pass
158
159
160class CloseFailureIO(MockRawIO):
161 closed = 0
162
163 def close(self):
164 if not self.closed:
165 self.closed = 1
166 raise IOError
167
168class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
169 pass
170
171class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
172 pass
173
174
175class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000176
177 def __init__(self, data):
178 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000179 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000180
181 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000182 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000183 self.read_history.append(None if res is None else len(res))
184 return res
185
Antoine Pitrou19690592009-06-12 20:14:08 +0000186 def readinto(self, b):
187 res = super(MockFileIO, self).readinto(b)
188 self.read_history.append(res)
189 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000190
Antoine Pitrou19690592009-06-12 20:14:08 +0000191class CMockFileIO(MockFileIO, io.BytesIO):
192 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000193
Antoine Pitrou19690592009-06-12 20:14:08 +0000194class PyMockFileIO(MockFileIO, pyio.BytesIO):
195 pass
196
197
198class MockNonBlockWriterIO:
199
200 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000201 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000202 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000203
Antoine Pitrou19690592009-06-12 20:14:08 +0000204 def pop_written(self):
205 s = b"".join(self._write_stack)
206 self._write_stack[:] = []
207 return s
208
209 def block_on(self, char):
210 """Block when a given char is encountered."""
211 self._blocker_char = char
212
213 def readable(self):
214 return True
215
216 def seekable(self):
217 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000218
219 def writable(self):
220 return True
221
Antoine Pitrou19690592009-06-12 20:14:08 +0000222 def write(self, b):
223 b = bytes(b)
224 n = -1
225 if self._blocker_char:
226 try:
227 n = b.index(self._blocker_char)
228 except ValueError:
229 pass
230 else:
231 self._blocker_char = None
232 self._write_stack.append(b[:n])
233 raise self.BlockingIOError(0, "test blocking", n)
234 self._write_stack.append(b)
235 return len(b)
236
237class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
238 BlockingIOError = io.BlockingIOError
239
240class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
241 BlockingIOError = pyio.BlockingIOError
242
Christian Heimes1a6387e2008-03-26 12:49:49 +0000243
244class IOTest(unittest.TestCase):
245
Antoine Pitrou19690592009-06-12 20:14:08 +0000246 def setUp(self):
247 support.unlink(support.TESTFN)
248
Christian Heimes1a6387e2008-03-26 12:49:49 +0000249 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000250 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000251
252 def write_ops(self, f):
253 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000254 f.truncate(0)
255 self.assertEqual(f.tell(), 5)
256 f.seek(0)
257
258 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000259 self.assertEqual(f.seek(0), 0)
260 self.assertEqual(f.write(b"Hello."), 6)
261 self.assertEqual(f.tell(), 6)
262 self.assertEqual(f.seek(-1, 1), 5)
263 self.assertEqual(f.tell(), 5)
264 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
265 self.assertEqual(f.seek(0), 0)
266 self.assertEqual(f.write(b"h"), 1)
267 self.assertEqual(f.seek(-1, 2), 13)
268 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000269
Christian Heimes1a6387e2008-03-26 12:49:49 +0000270 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000271 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000272 self.assertRaises(TypeError, f.seek, 0.0)
273
274 def read_ops(self, f, buffered=False):
275 data = f.read(5)
276 self.assertEqual(data, b"hello")
277 data = bytearray(data)
278 self.assertEqual(f.readinto(data), 5)
279 self.assertEqual(data, b" worl")
280 self.assertEqual(f.readinto(data), 2)
281 self.assertEqual(len(data), 5)
282 self.assertEqual(data[:2], b"d\n")
283 self.assertEqual(f.seek(0), 0)
284 self.assertEqual(f.read(20), b"hello world\n")
285 self.assertEqual(f.read(1), b"")
286 self.assertEqual(f.readinto(bytearray(b"x")), 0)
287 self.assertEqual(f.seek(-6, 2), 6)
288 self.assertEqual(f.read(5), b"world")
289 self.assertEqual(f.read(0), b"")
290 self.assertEqual(f.readinto(bytearray()), 0)
291 self.assertEqual(f.seek(-6, 1), 5)
292 self.assertEqual(f.read(5), b" worl")
293 self.assertEqual(f.tell(), 10)
294 self.assertRaises(TypeError, f.seek, 0.0)
295 if buffered:
296 f.seek(0)
297 self.assertEqual(f.read(), b"hello world\n")
298 f.seek(6)
299 self.assertEqual(f.read(), b"world\n")
300 self.assertEqual(f.read(), b"")
301
302 LARGE = 2**31
303
304 def large_file_ops(self, f):
305 assert f.readable()
306 assert f.writable()
307 self.assertEqual(f.seek(self.LARGE), self.LARGE)
308 self.assertEqual(f.tell(), self.LARGE)
309 self.assertEqual(f.write(b"xxx"), 3)
310 self.assertEqual(f.tell(), self.LARGE + 3)
311 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
312 self.assertEqual(f.truncate(), self.LARGE + 2)
313 self.assertEqual(f.tell(), self.LARGE + 2)
314 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
315 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000316 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000317 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
318 self.assertEqual(f.seek(-1, 2), self.LARGE)
319 self.assertEqual(f.read(2), b"x")
320
Antoine Pitrou19690592009-06-12 20:14:08 +0000321 def test_invalid_operations(self):
322 # Try writing on a file opened in read mode and vice-versa.
323 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000324 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000325 self.assertRaises(IOError, fp.read)
326 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000327 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000328 self.assertRaises(IOError, fp.write, b"blah")
329 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000330 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000331 self.assertRaises(IOError, fp.write, "blah")
332 self.assertRaises(IOError, fp.writelines, ["blah\n"])
333
Christian Heimes1a6387e2008-03-26 12:49:49 +0000334 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000335 with self.open(support.TESTFN, "wb", buffering=0) as f:
336 self.assertEqual(f.readable(), False)
337 self.assertEqual(f.writable(), True)
338 self.assertEqual(f.seekable(), True)
339 self.write_ops(f)
340 with self.open(support.TESTFN, "rb", buffering=0) as f:
341 self.assertEqual(f.readable(), True)
342 self.assertEqual(f.writable(), False)
343 self.assertEqual(f.seekable(), True)
344 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000345
346 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000347 with self.open(support.TESTFN, "wb") as f:
348 self.assertEqual(f.readable(), False)
349 self.assertEqual(f.writable(), True)
350 self.assertEqual(f.seekable(), True)
351 self.write_ops(f)
352 with self.open(support.TESTFN, "rb") as f:
353 self.assertEqual(f.readable(), True)
354 self.assertEqual(f.writable(), False)
355 self.assertEqual(f.seekable(), True)
356 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000357
358 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000359 with self.open(support.TESTFN, "wb") as f:
360 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
361 with self.open(support.TESTFN, "rb") as f:
362 self.assertEqual(f.readline(), b"abc\n")
363 self.assertEqual(f.readline(10), b"def\n")
364 self.assertEqual(f.readline(2), b"xy")
365 self.assertEqual(f.readline(4), b"zzy\n")
366 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000367 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000368 self.assertRaises(TypeError, f.readline, 5.3)
369 with self.open(support.TESTFN, "r") as f:
370 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000371
372 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000373 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000374 self.write_ops(f)
375 data = f.getvalue()
376 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000377 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000378 self.read_ops(f, True)
379
380 def test_large_file_ops(self):
381 # On Windows and Mac OSX this test comsumes large resources; It takes
382 # a long time to build the >2GB file and takes >2GB of disk space
383 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000384 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
385 if not support.is_resource_enabled("largefile"):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000386 print("\nTesting large file ops skipped on %s." % sys.platform,
387 file=sys.stderr)
388 print("It requires %d bytes and a long time." % self.LARGE,
389 file=sys.stderr)
390 print("Use 'regrtest.py -u largefile test_io' to run it.",
391 file=sys.stderr)
392 return
Antoine Pitrou19690592009-06-12 20:14:08 +0000393 with self.open(support.TESTFN, "w+b", 0) as f:
394 self.large_file_ops(f)
395 with self.open(support.TESTFN, "w+b") as f:
396 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000397
398 def test_with_open(self):
399 for bufsize in (0, 1, 100):
400 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000401 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000402 f.write(b"xxx")
403 self.assertEqual(f.closed, True)
404 f = None
405 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000406 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000407 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000408 except ZeroDivisionError:
409 self.assertEqual(f.closed, True)
410 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000411 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000412
Antoine Pitroue741cc62009-01-21 00:45:36 +0000413 # issue 5008
414 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000415 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000416 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000417 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000418 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000419 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000420 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000421 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000422 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000423
Christian Heimes1a6387e2008-03-26 12:49:49 +0000424 def test_destructor(self):
425 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000426 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000427 def __del__(self):
428 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000429 try:
430 f = super(MyFileIO, self).__del__
431 except AttributeError:
432 pass
433 else:
434 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000435 def close(self):
436 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000437 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000438 def flush(self):
439 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000440 super(MyFileIO, self).flush()
441 f = MyFileIO(support.TESTFN, "wb")
442 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000443 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000444 support.gc_collect()
445 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000446 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000447 self.assertEqual(f.read(), b"xxx")
448
449 def _check_base_destructor(self, base):
450 record = []
451 class MyIO(base):
452 def __init__(self):
453 # This exercises the availability of attributes on object
454 # destruction.
455 # (in the C version, close() is called by the tp_dealloc
456 # function, not by __del__)
457 self.on_del = 1
458 self.on_close = 2
459 self.on_flush = 3
460 def __del__(self):
461 record.append(self.on_del)
462 try:
463 f = super(MyIO, self).__del__
464 except AttributeError:
465 pass
466 else:
467 f()
468 def close(self):
469 record.append(self.on_close)
470 super(MyIO, self).close()
471 def flush(self):
472 record.append(self.on_flush)
473 super(MyIO, self).flush()
474 f = MyIO()
475 del f
476 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000477 self.assertEqual(record, [1, 2, 3])
478
Antoine Pitrou19690592009-06-12 20:14:08 +0000479 def test_IOBase_destructor(self):
480 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000481
Antoine Pitrou19690592009-06-12 20:14:08 +0000482 def test_RawIOBase_destructor(self):
483 self._check_base_destructor(self.RawIOBase)
484
485 def test_BufferedIOBase_destructor(self):
486 self._check_base_destructor(self.BufferedIOBase)
487
488 def test_TextIOBase_destructor(self):
489 self._check_base_destructor(self.TextIOBase)
490
491 def test_close_flushes(self):
492 with self.open(support.TESTFN, "wb") as f:
493 f.write(b"xxx")
494 with self.open(support.TESTFN, "rb") as f:
495 self.assertEqual(f.read(), b"xxx")
496
497 def test_array_writes(self):
498 a = array.array(b'i', range(10))
499 n = len(a.tostring())
500 with self.open(support.TESTFN, "wb", 0) as f:
501 self.assertEqual(f.write(a), n)
502 with self.open(support.TESTFN, "wb") as f:
503 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000504
505 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000506 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000507 closefd=False)
508
Antoine Pitrou19690592009-06-12 20:14:08 +0000509 def test_read_closed(self):
510 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000511 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000512 with self.open(support.TESTFN, "r") as f:
513 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000514 self.assertEqual(file.read(), "egg\n")
515 file.seek(0)
516 file.close()
517 self.assertRaises(ValueError, file.read)
518
519 def test_no_closefd_with_filename(self):
520 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000521 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000522
523 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000524 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000525 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000526 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000527 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000528 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000529 self.assertEqual(file.buffer.raw.closefd, False)
530
Antoine Pitrou19690592009-06-12 20:14:08 +0000531 def test_garbage_collection(self):
532 # FileIO objects are collected, and collecting them flushes
533 # all data to disk.
534 f = self.FileIO(support.TESTFN, "wb")
535 f.write(b"abcxxx")
536 f.f = f
537 wr = weakref.ref(f)
538 del f
539 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000540 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000541 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000542 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000543
Antoine Pitrou19690592009-06-12 20:14:08 +0000544 def test_unbounded_file(self):
545 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
546 zero = "/dev/zero"
547 if not os.path.exists(zero):
548 self.skipTest("{0} does not exist".format(zero))
549 if sys.maxsize > 0x7FFFFFFF:
550 self.skipTest("test can only run in a 32-bit address space")
551 if support.real_max_memuse < support._2G:
552 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000553 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000554 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000555 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000556 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000557 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000558 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000559
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000560 def test_flush_error_on_close(self):
561 f = self.open(support.TESTFN, "wb", buffering=0)
562 def bad_flush():
563 raise IOError()
564 f.flush = bad_flush
565 self.assertRaises(IOError, f.close) # exception not swallowed
566
567 def test_multi_close(self):
568 f = self.open(support.TESTFN, "wb", buffering=0)
569 f.close()
570 f.close()
571 f.close()
572 self.assertRaises(ValueError, f.flush)
573
Antoine Pitrou6391b342010-09-14 18:48:19 +0000574 def test_RawIOBase_read(self):
575 # Exercise the default RawIOBase.read() implementation (which calls
576 # readinto() internally).
577 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
578 self.assertEqual(rawio.read(2), b"ab")
579 self.assertEqual(rawio.read(2), b"c")
580 self.assertEqual(rawio.read(2), b"d")
581 self.assertEqual(rawio.read(2), None)
582 self.assertEqual(rawio.read(2), b"ef")
583 self.assertEqual(rawio.read(2), b"g")
584 self.assertEqual(rawio.read(2), None)
585 self.assertEqual(rawio.read(2), b"")
586
Antoine Pitrou19690592009-06-12 20:14:08 +0000587class CIOTest(IOTest):
588 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000589
Antoine Pitrou19690592009-06-12 20:14:08 +0000590class PyIOTest(IOTest):
591 test_array_writes = unittest.skip(
592 "len(array.array) returns number of elements rather than bytelength"
593 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000594
595
Antoine Pitrou19690592009-06-12 20:14:08 +0000596class CommonBufferedTests:
597 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
598
599 def test_detach(self):
600 raw = self.MockRawIO()
601 buf = self.tp(raw)
602 self.assertIs(buf.detach(), raw)
603 self.assertRaises(ValueError, buf.detach)
604
605 def test_fileno(self):
606 rawio = self.MockRawIO()
607 bufio = self.tp(rawio)
608
Ezio Melotti2623a372010-11-21 13:34:58 +0000609 self.assertEqual(42, bufio.fileno())
Antoine Pitrou19690592009-06-12 20:14:08 +0000610
611 def test_no_fileno(self):
612 # XXX will we always have fileno() function? If so, kill
613 # this test. Else, write it.
614 pass
615
616 def test_invalid_args(self):
617 rawio = self.MockRawIO()
618 bufio = self.tp(rawio)
619 # Invalid whence
620 self.assertRaises(ValueError, bufio.seek, 0, -1)
621 self.assertRaises(ValueError, bufio.seek, 0, 3)
622
623 def test_override_destructor(self):
624 tp = self.tp
625 record = []
626 class MyBufferedIO(tp):
627 def __del__(self):
628 record.append(1)
629 try:
630 f = super(MyBufferedIO, self).__del__
631 except AttributeError:
632 pass
633 else:
634 f()
635 def close(self):
636 record.append(2)
637 super(MyBufferedIO, self).close()
638 def flush(self):
639 record.append(3)
640 super(MyBufferedIO, self).flush()
641 rawio = self.MockRawIO()
642 bufio = MyBufferedIO(rawio)
643 writable = bufio.writable()
644 del bufio
645 support.gc_collect()
646 if writable:
647 self.assertEqual(record, [1, 2, 3])
648 else:
649 self.assertEqual(record, [1, 2])
650
651 def test_context_manager(self):
652 # Test usability as a context manager
653 rawio = self.MockRawIO()
654 bufio = self.tp(rawio)
655 def _with():
656 with bufio:
657 pass
658 _with()
659 # bufio should now be closed, and using it a second time should raise
660 # a ValueError.
661 self.assertRaises(ValueError, _with)
662
663 def test_error_through_destructor(self):
664 # Test that the exception state is not modified by a destructor,
665 # even if close() fails.
666 rawio = self.CloseFailureIO()
667 def f():
668 self.tp(rawio).xyzzy
669 with support.captured_output("stderr") as s:
670 self.assertRaises(AttributeError, f)
671 s = s.getvalue().strip()
672 if s:
673 # The destructor *may* have printed an unraisable error, check it
674 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000675 self.assertTrue(s.startswith("Exception IOError: "), s)
676 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000677
678 def test_repr(self):
679 raw = self.MockRawIO()
680 b = self.tp(raw)
681 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
682 self.assertEqual(repr(b), "<%s>" % clsname)
683 raw.name = "dummy"
684 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
685 raw.name = b"dummy"
686 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000687
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000688 def test_flush_error_on_close(self):
689 raw = self.MockRawIO()
690 def bad_flush():
691 raise IOError()
692 raw.flush = bad_flush
693 b = self.tp(raw)
694 self.assertRaises(IOError, b.close) # exception not swallowed
695
696 def test_multi_close(self):
697 raw = self.MockRawIO()
698 b = self.tp(raw)
699 b.close()
700 b.close()
701 b.close()
702 self.assertRaises(ValueError, b.flush)
703
Antoine Pitroufc9ead62010-12-21 21:26:55 +0000704 def test_readonly_attributes(self):
705 raw = self.MockRawIO()
706 buf = self.tp(raw)
707 x = self.MockRawIO()
708 with self.assertRaises((AttributeError, TypeError)):
709 buf.raw = x
710
Christian Heimes1a6387e2008-03-26 12:49:49 +0000711
Antoine Pitrou19690592009-06-12 20:14:08 +0000712class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
713 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000714
Antoine Pitrou19690592009-06-12 20:14:08 +0000715 def test_constructor(self):
716 rawio = self.MockRawIO([b"abc"])
717 bufio = self.tp(rawio)
718 bufio.__init__(rawio)
719 bufio.__init__(rawio, buffer_size=1024)
720 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000721 self.assertEqual(b"abc", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000722 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
723 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
724 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
725 rawio = self.MockRawIO([b"abc"])
726 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000727 self.assertEqual(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000728
Antoine Pitrou19690592009-06-12 20:14:08 +0000729 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000730 for arg in (None, 7):
731 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
732 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000733 self.assertEqual(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000734 # Invalid args
735 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000736
Antoine Pitrou19690592009-06-12 20:14:08 +0000737 def test_read1(self):
738 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
739 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000740 self.assertEqual(b"a", bufio.read(1))
741 self.assertEqual(b"b", bufio.read1(1))
742 self.assertEqual(rawio._reads, 1)
743 self.assertEqual(b"c", bufio.read1(100))
744 self.assertEqual(rawio._reads, 1)
745 self.assertEqual(b"d", bufio.read1(100))
746 self.assertEqual(rawio._reads, 2)
747 self.assertEqual(b"efg", bufio.read1(100))
748 self.assertEqual(rawio._reads, 3)
749 self.assertEqual(b"", bufio.read1(100))
750 self.assertEqual(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000751 # Invalid args
752 self.assertRaises(ValueError, bufio.read1, -1)
753
754 def test_readinto(self):
755 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
756 bufio = self.tp(rawio)
757 b = bytearray(2)
Ezio Melotti2623a372010-11-21 13:34:58 +0000758 self.assertEqual(bufio.readinto(b), 2)
759 self.assertEqual(b, b"ab")
760 self.assertEqual(bufio.readinto(b), 2)
761 self.assertEqual(b, b"cd")
762 self.assertEqual(bufio.readinto(b), 2)
763 self.assertEqual(b, b"ef")
764 self.assertEqual(bufio.readinto(b), 1)
765 self.assertEqual(b, b"gf")
766 self.assertEqual(bufio.readinto(b), 0)
767 self.assertEqual(b, b"gf")
Antoine Pitrou19690592009-06-12 20:14:08 +0000768
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000769 def test_readlines(self):
770 def bufio():
771 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
772 return self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000773 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
774 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
775 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000776
Antoine Pitrou19690592009-06-12 20:14:08 +0000777 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000778 data = b"abcdefghi"
779 dlen = len(data)
780
781 tests = [
782 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
783 [ 100, [ 3, 3, 3], [ dlen ] ],
784 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
785 ]
786
787 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000788 rawio = self.MockFileIO(data)
789 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000790 pos = 0
791 for nbytes in buf_read_sizes:
Ezio Melotti2623a372010-11-21 13:34:58 +0000792 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Christian Heimes1a6387e2008-03-26 12:49:49 +0000793 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000794 # this is mildly implementation-dependent
Ezio Melotti2623a372010-11-21 13:34:58 +0000795 self.assertEqual(rawio.read_history, raw_read_sizes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000796
Antoine Pitrou19690592009-06-12 20:14:08 +0000797 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000798 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000799 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
800 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000801
Ezio Melotti2623a372010-11-21 13:34:58 +0000802 self.assertEqual(b"abcd", bufio.read(6))
803 self.assertEqual(b"e", bufio.read(1))
804 self.assertEqual(b"fg", bufio.read())
805 self.assertEqual(b"", bufio.peek(1))
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000806 self.assertTrue(None is bufio.read())
Ezio Melotti2623a372010-11-21 13:34:58 +0000807 self.assertEqual(b"", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000808
Antoine Pitrou19690592009-06-12 20:14:08 +0000809 def test_read_past_eof(self):
810 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
811 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000812
Ezio Melotti2623a372010-11-21 13:34:58 +0000813 self.assertEqual(b"abcdefg", bufio.read(9000))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000814
Antoine Pitrou19690592009-06-12 20:14:08 +0000815 def test_read_all(self):
816 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
817 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000818
Ezio Melotti2623a372010-11-21 13:34:58 +0000819 self.assertEqual(b"abcdefg", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000820
Victor Stinner6a102812010-04-27 23:55:59 +0000821 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +0000822 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +0000823 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000824 try:
825 # Write out many bytes with exactly the same number of 0's,
826 # 1's... 255's. This will help us check that concurrent reading
827 # doesn't duplicate or forget contents.
828 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000829 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000830 random.shuffle(l)
831 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000832 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000833 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000834 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000835 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000836 errors = []
837 results = []
838 def f():
839 try:
840 # Intra-buffer read then buffer-flushing read
841 for n in cycle([1, 19]):
842 s = bufio.read(n)
843 if not s:
844 break
845 # list.append() is atomic
846 results.append(s)
847 except Exception as e:
848 errors.append(e)
849 raise
850 threads = [threading.Thread(target=f) for x in range(20)]
851 for t in threads:
852 t.start()
853 time.sleep(0.02) # yield
854 for t in threads:
855 t.join()
856 self.assertFalse(errors,
857 "the following exceptions were caught: %r" % errors)
858 s = b''.join(results)
859 for i in range(256):
860 c = bytes(bytearray([i]))
861 self.assertEqual(s.count(c), N)
862 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000863 support.unlink(support.TESTFN)
864
865 def test_misbehaved_io(self):
866 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
867 bufio = self.tp(rawio)
868 self.assertRaises(IOError, bufio.seek, 0)
869 self.assertRaises(IOError, bufio.tell)
870
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000871 def test_no_extraneous_read(self):
872 # Issue #9550; when the raw IO object has satisfied the read request,
873 # we should not issue any additional reads, otherwise it may block
874 # (e.g. socket).
875 bufsize = 16
876 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
877 rawio = self.MockRawIO([b"x" * n])
878 bufio = self.tp(rawio, bufsize)
879 self.assertEqual(bufio.read(n), b"x" * n)
880 # Simple case: one raw read is enough to satisfy the request.
881 self.assertEqual(rawio._extraneous_reads, 0,
882 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
883 # A more complex case where two raw reads are needed to satisfy
884 # the request.
885 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
886 bufio = self.tp(rawio, bufsize)
887 self.assertEqual(bufio.read(n), b"x" * n)
888 self.assertEqual(rawio._extraneous_reads, 0,
889 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
890
891
Antoine Pitrou19690592009-06-12 20:14:08 +0000892class CBufferedReaderTest(BufferedReaderTest):
893 tp = io.BufferedReader
894
895 def test_constructor(self):
896 BufferedReaderTest.test_constructor(self)
897 # The allocation can succeed on 32-bit builds, e.g. with more
898 # than 2GB RAM and a 64-bit kernel.
899 if sys.maxsize > 0x7FFFFFFF:
900 rawio = self.MockRawIO()
901 bufio = self.tp(rawio)
902 self.assertRaises((OverflowError, MemoryError, ValueError),
903 bufio.__init__, rawio, sys.maxsize)
904
905 def test_initialization(self):
906 rawio = self.MockRawIO([b"abc"])
907 bufio = self.tp(rawio)
908 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
909 self.assertRaises(ValueError, bufio.read)
910 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
911 self.assertRaises(ValueError, bufio.read)
912 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
913 self.assertRaises(ValueError, bufio.read)
914
915 def test_misbehaved_io_read(self):
916 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
917 bufio = self.tp(rawio)
918 # _pyio.BufferedReader seems to implement reading different, so that
919 # checking this is not so easy.
920 self.assertRaises(IOError, bufio.read, 10)
921
922 def test_garbage_collection(self):
923 # C BufferedReader objects are collected.
924 # The Python version has __del__, so it ends into gc.garbage instead
925 rawio = self.FileIO(support.TESTFN, "w+b")
926 f = self.tp(rawio)
927 f.f = f
928 wr = weakref.ref(f)
929 del f
930 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000931 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +0000932
933class PyBufferedReaderTest(BufferedReaderTest):
934 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000935
936
Antoine Pitrou19690592009-06-12 20:14:08 +0000937class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
938 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000939
Antoine Pitrou19690592009-06-12 20:14:08 +0000940 def test_constructor(self):
941 rawio = self.MockRawIO()
942 bufio = self.tp(rawio)
943 bufio.__init__(rawio)
944 bufio.__init__(rawio, buffer_size=1024)
945 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000946 self.assertEqual(3, bufio.write(b"abc"))
Antoine Pitrou19690592009-06-12 20:14:08 +0000947 bufio.flush()
948 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
949 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
950 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
951 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000952 self.assertEqual(3, bufio.write(b"ghi"))
Antoine Pitrou19690592009-06-12 20:14:08 +0000953 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +0000954 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000955
Antoine Pitrou19690592009-06-12 20:14:08 +0000956 def test_detach_flush(self):
957 raw = self.MockRawIO()
958 buf = self.tp(raw)
959 buf.write(b"howdy!")
960 self.assertFalse(raw._write_stack)
961 buf.detach()
962 self.assertEqual(raw._write_stack, [b"howdy!"])
963
964 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000965 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +0000966 writer = self.MockRawIO()
967 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000968 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000969 self.assertFalse(writer._write_stack)
970
Antoine Pitrou19690592009-06-12 20:14:08 +0000971 def test_write_overflow(self):
972 writer = self.MockRawIO()
973 bufio = self.tp(writer, 8)
974 contents = b"abcdefghijklmnop"
975 for n in range(0, len(contents), 3):
976 bufio.write(contents[n:n+3])
977 flushed = b"".join(writer._write_stack)
978 # At least (total - 8) bytes were implicitly flushed, perhaps more
979 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000980 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000981
Antoine Pitrou19690592009-06-12 20:14:08 +0000982 def check_writes(self, intermediate_func):
983 # Lots of writes, test the flushed output is as expected.
984 contents = bytes(range(256)) * 1000
985 n = 0
986 writer = self.MockRawIO()
987 bufio = self.tp(writer, 13)
988 # Generator of write sizes: repeat each N 15 times then proceed to N+1
989 def gen_sizes():
990 for size in count(1):
991 for i in range(15):
992 yield size
993 sizes = gen_sizes()
994 while n < len(contents):
995 size = min(next(sizes), len(contents) - n)
Ezio Melotti2623a372010-11-21 13:34:58 +0000996 self.assertEqual(bufio.write(contents[n:n+size]), size)
Antoine Pitrou19690592009-06-12 20:14:08 +0000997 intermediate_func(bufio)
998 n += size
999 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001000 self.assertEqual(contents,
Antoine Pitrou19690592009-06-12 20:14:08 +00001001 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001002
Antoine Pitrou19690592009-06-12 20:14:08 +00001003 def test_writes(self):
1004 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001005
Antoine Pitrou19690592009-06-12 20:14:08 +00001006 def test_writes_and_flushes(self):
1007 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001008
Antoine Pitrou19690592009-06-12 20:14:08 +00001009 def test_writes_and_seeks(self):
1010 def _seekabs(bufio):
1011 pos = bufio.tell()
1012 bufio.seek(pos + 1, 0)
1013 bufio.seek(pos - 1, 0)
1014 bufio.seek(pos, 0)
1015 self.check_writes(_seekabs)
1016 def _seekrel(bufio):
1017 pos = bufio.seek(0, 1)
1018 bufio.seek(+1, 1)
1019 bufio.seek(-1, 1)
1020 bufio.seek(pos, 0)
1021 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001022
Antoine Pitrou19690592009-06-12 20:14:08 +00001023 def test_writes_and_truncates(self):
1024 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001025
Antoine Pitrou19690592009-06-12 20:14:08 +00001026 def test_write_non_blocking(self):
1027 raw = self.MockNonBlockWriterIO()
1028 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001029
Ezio Melotti2623a372010-11-21 13:34:58 +00001030 self.assertEqual(bufio.write(b"abcd"), 4)
1031 self.assertEqual(bufio.write(b"efghi"), 5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001032 # 1 byte will be written, the rest will be buffered
1033 raw.block_on(b"k")
Ezio Melotti2623a372010-11-21 13:34:58 +00001034 self.assertEqual(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001035
Antoine Pitrou19690592009-06-12 20:14:08 +00001036 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1037 raw.block_on(b"0")
1038 try:
1039 bufio.write(b"opqrwxyz0123456789")
1040 except self.BlockingIOError as e:
1041 written = e.characters_written
1042 else:
1043 self.fail("BlockingIOError should have been raised")
Ezio Melotti2623a372010-11-21 13:34:58 +00001044 self.assertEqual(written, 16)
1045 self.assertEqual(raw.pop_written(),
Antoine Pitrou19690592009-06-12 20:14:08 +00001046 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001047
Ezio Melotti2623a372010-11-21 13:34:58 +00001048 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Antoine Pitrou19690592009-06-12 20:14:08 +00001049 s = raw.pop_written()
1050 # Previously buffered bytes were flushed
1051 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001052
Antoine Pitrou19690592009-06-12 20:14:08 +00001053 def test_write_and_rewind(self):
1054 raw = io.BytesIO()
1055 bufio = self.tp(raw, 4)
1056 self.assertEqual(bufio.write(b"abcdef"), 6)
1057 self.assertEqual(bufio.tell(), 6)
1058 bufio.seek(0, 0)
1059 self.assertEqual(bufio.write(b"XY"), 2)
1060 bufio.seek(6, 0)
1061 self.assertEqual(raw.getvalue(), b"XYcdef")
1062 self.assertEqual(bufio.write(b"123456"), 6)
1063 bufio.flush()
1064 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001065
Antoine Pitrou19690592009-06-12 20:14:08 +00001066 def test_flush(self):
1067 writer = self.MockRawIO()
1068 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001069 bufio.write(b"abc")
1070 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001071 self.assertEqual(b"abc", writer._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001072
Antoine Pitrou19690592009-06-12 20:14:08 +00001073 def test_destructor(self):
1074 writer = self.MockRawIO()
1075 bufio = self.tp(writer, 8)
1076 bufio.write(b"abc")
1077 del bufio
1078 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001079 self.assertEqual(b"abc", writer._write_stack[0])
Antoine Pitrou19690592009-06-12 20:14:08 +00001080
1081 def test_truncate(self):
1082 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001083 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001084 bufio = self.tp(raw, 8)
1085 bufio.write(b"abcdef")
1086 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001087 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001088 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001089 self.assertEqual(f.read(), b"abc")
1090
Victor Stinner6a102812010-04-27 23:55:59 +00001091 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001092 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001093 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001094 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001095 # Write out many bytes from many threads and test they were
1096 # all flushed.
1097 N = 1000
1098 contents = bytes(range(256)) * N
1099 sizes = cycle([1, 19])
1100 n = 0
1101 queue = deque()
1102 while n < len(contents):
1103 size = next(sizes)
1104 queue.append(contents[n:n+size])
1105 n += size
1106 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001107 # We use a real file object because it allows us to
1108 # exercise situations where the GIL is released before
1109 # writing the buffer to the raw streams. This is in addition
1110 # to concurrency issues due to switching threads in the middle
1111 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001112 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001113 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001114 errors = []
1115 def f():
1116 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001117 while True:
1118 try:
1119 s = queue.popleft()
1120 except IndexError:
1121 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001122 bufio.write(s)
1123 except Exception as e:
1124 errors.append(e)
1125 raise
1126 threads = [threading.Thread(target=f) for x in range(20)]
1127 for t in threads:
1128 t.start()
1129 time.sleep(0.02) # yield
1130 for t in threads:
1131 t.join()
1132 self.assertFalse(errors,
1133 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001134 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001135 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001136 s = f.read()
1137 for i in range(256):
Ezio Melotti2623a372010-11-21 13:34:58 +00001138 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001139 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001140 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001141
Antoine Pitrou19690592009-06-12 20:14:08 +00001142 def test_misbehaved_io(self):
1143 rawio = self.MisbehavedRawIO()
1144 bufio = self.tp(rawio, 5)
1145 self.assertRaises(IOError, bufio.seek, 0)
1146 self.assertRaises(IOError, bufio.tell)
1147 self.assertRaises(IOError, bufio.write, b"abcdef")
1148
1149 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001150 with support.check_warnings(("max_buffer_size is deprecated",
1151 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001152 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001153
1154
1155class CBufferedWriterTest(BufferedWriterTest):
1156 tp = io.BufferedWriter
1157
1158 def test_constructor(self):
1159 BufferedWriterTest.test_constructor(self)
1160 # The allocation can succeed on 32-bit builds, e.g. with more
1161 # than 2GB RAM and a 64-bit kernel.
1162 if sys.maxsize > 0x7FFFFFFF:
1163 rawio = self.MockRawIO()
1164 bufio = self.tp(rawio)
1165 self.assertRaises((OverflowError, MemoryError, ValueError),
1166 bufio.__init__, rawio, sys.maxsize)
1167
1168 def test_initialization(self):
1169 rawio = self.MockRawIO()
1170 bufio = self.tp(rawio)
1171 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1172 self.assertRaises(ValueError, bufio.write, b"def")
1173 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1174 self.assertRaises(ValueError, bufio.write, b"def")
1175 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1176 self.assertRaises(ValueError, bufio.write, b"def")
1177
1178 def test_garbage_collection(self):
1179 # C BufferedWriter objects are collected, and collecting them flushes
1180 # all data to disk.
1181 # The Python version has __del__, so it ends into gc.garbage instead
1182 rawio = self.FileIO(support.TESTFN, "w+b")
1183 f = self.tp(rawio)
1184 f.write(b"123xxx")
1185 f.x = f
1186 wr = weakref.ref(f)
1187 del f
1188 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001189 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001190 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001191 self.assertEqual(f.read(), b"123xxx")
1192
1193
1194class PyBufferedWriterTest(BufferedWriterTest):
1195 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001196
1197class BufferedRWPairTest(unittest.TestCase):
1198
Antoine Pitrou19690592009-06-12 20:14:08 +00001199 def test_constructor(self):
1200 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001201 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001202
Antoine Pitrou19690592009-06-12 20:14:08 +00001203 def test_detach(self):
1204 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1205 self.assertRaises(self.UnsupportedOperation, pair.detach)
1206
1207 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001208 with support.check_warnings(("max_buffer_size is deprecated",
1209 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001210 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001211
1212 def test_constructor_with_not_readable(self):
1213 class NotReadable(MockRawIO):
1214 def readable(self):
1215 return False
1216
1217 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1218
1219 def test_constructor_with_not_writeable(self):
1220 class NotWriteable(MockRawIO):
1221 def writable(self):
1222 return False
1223
1224 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1225
1226 def test_read(self):
1227 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1228
1229 self.assertEqual(pair.read(3), b"abc")
1230 self.assertEqual(pair.read(1), b"d")
1231 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001232 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1233 self.assertEqual(pair.read(None), b"abc")
1234
1235 def test_readlines(self):
1236 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1237 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1238 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1239 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001240
1241 def test_read1(self):
1242 # .read1() is delegated to the underlying reader object, so this test
1243 # can be shallow.
1244 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1245
1246 self.assertEqual(pair.read1(3), b"abc")
1247
1248 def test_readinto(self):
1249 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1250
1251 data = bytearray(5)
1252 self.assertEqual(pair.readinto(data), 5)
1253 self.assertEqual(data, b"abcde")
1254
1255 def test_write(self):
1256 w = self.MockRawIO()
1257 pair = self.tp(self.MockRawIO(), w)
1258
1259 pair.write(b"abc")
1260 pair.flush()
1261 pair.write(b"def")
1262 pair.flush()
1263 self.assertEqual(w._write_stack, [b"abc", b"def"])
1264
1265 def test_peek(self):
1266 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1267
1268 self.assertTrue(pair.peek(3).startswith(b"abc"))
1269 self.assertEqual(pair.read(3), b"abc")
1270
1271 def test_readable(self):
1272 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1273 self.assertTrue(pair.readable())
1274
1275 def test_writeable(self):
1276 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1277 self.assertTrue(pair.writable())
1278
1279 def test_seekable(self):
1280 # BufferedRWPairs are never seekable, even if their readers and writers
1281 # are.
1282 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1283 self.assertFalse(pair.seekable())
1284
1285 # .flush() is delegated to the underlying writer object and has been
1286 # tested in the test_write method.
1287
1288 def test_close_and_closed(self):
1289 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1290 self.assertFalse(pair.closed)
1291 pair.close()
1292 self.assertTrue(pair.closed)
1293
1294 def test_isatty(self):
1295 class SelectableIsAtty(MockRawIO):
1296 def __init__(self, isatty):
1297 MockRawIO.__init__(self)
1298 self._isatty = isatty
1299
1300 def isatty(self):
1301 return self._isatty
1302
1303 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1304 self.assertFalse(pair.isatty())
1305
1306 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1307 self.assertTrue(pair.isatty())
1308
1309 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1310 self.assertTrue(pair.isatty())
1311
1312 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1313 self.assertTrue(pair.isatty())
1314
1315class CBufferedRWPairTest(BufferedRWPairTest):
1316 tp = io.BufferedRWPair
1317
1318class PyBufferedRWPairTest(BufferedRWPairTest):
1319 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001320
1321
Antoine Pitrou19690592009-06-12 20:14:08 +00001322class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1323 read_mode = "rb+"
1324 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001325
Antoine Pitrou19690592009-06-12 20:14:08 +00001326 def test_constructor(self):
1327 BufferedReaderTest.test_constructor(self)
1328 BufferedWriterTest.test_constructor(self)
1329
1330 def test_read_and_write(self):
1331 raw = self.MockRawIO((b"asdf", b"ghjk"))
1332 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001333
1334 self.assertEqual(b"as", rw.read(2))
1335 rw.write(b"ddd")
1336 rw.write(b"eee")
1337 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001338 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001339 self.assertEqual(b"dddeee", raw._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001340
Antoine Pitrou19690592009-06-12 20:14:08 +00001341 def test_seek_and_tell(self):
1342 raw = self.BytesIO(b"asdfghjkl")
1343 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001344
Ezio Melotti2623a372010-11-21 13:34:58 +00001345 self.assertEqual(b"as", rw.read(2))
1346 self.assertEqual(2, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001347 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001348 self.assertEqual(b"asdf", rw.read(4))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001349
1350 rw.write(b"asdf")
1351 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001352 self.assertEqual(b"asdfasdfl", rw.read())
1353 self.assertEqual(9, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001354 rw.seek(-4, 2)
Ezio Melotti2623a372010-11-21 13:34:58 +00001355 self.assertEqual(5, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001356 rw.seek(2, 1)
Ezio Melotti2623a372010-11-21 13:34:58 +00001357 self.assertEqual(7, rw.tell())
1358 self.assertEqual(b"fl", rw.read(11))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001359 self.assertRaises(TypeError, rw.seek, 0.0)
1360
Antoine Pitrou19690592009-06-12 20:14:08 +00001361 def check_flush_and_read(self, read_func):
1362 raw = self.BytesIO(b"abcdefghi")
1363 bufio = self.tp(raw)
1364
Ezio Melotti2623a372010-11-21 13:34:58 +00001365 self.assertEqual(b"ab", read_func(bufio, 2))
Antoine Pitrou19690592009-06-12 20:14:08 +00001366 bufio.write(b"12")
Ezio Melotti2623a372010-11-21 13:34:58 +00001367 self.assertEqual(b"ef", read_func(bufio, 2))
1368 self.assertEqual(6, bufio.tell())
Antoine Pitrou19690592009-06-12 20:14:08 +00001369 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001370 self.assertEqual(6, bufio.tell())
1371 self.assertEqual(b"ghi", read_func(bufio))
Antoine Pitrou19690592009-06-12 20:14:08 +00001372 raw.seek(0, 0)
1373 raw.write(b"XYZ")
1374 # flush() resets the read buffer
1375 bufio.flush()
1376 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001377 self.assertEqual(b"XYZ", read_func(bufio, 3))
Antoine Pitrou19690592009-06-12 20:14:08 +00001378
1379 def test_flush_and_read(self):
1380 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1381
1382 def test_flush_and_readinto(self):
1383 def _readinto(bufio, n=-1):
1384 b = bytearray(n if n >= 0 else 9999)
1385 n = bufio.readinto(b)
1386 return bytes(b[:n])
1387 self.check_flush_and_read(_readinto)
1388
1389 def test_flush_and_peek(self):
1390 def _peek(bufio, n=-1):
1391 # This relies on the fact that the buffer can contain the whole
1392 # raw stream, otherwise peek() can return less.
1393 b = bufio.peek(n)
1394 if n != -1:
1395 b = b[:n]
1396 bufio.seek(len(b), 1)
1397 return b
1398 self.check_flush_and_read(_peek)
1399
1400 def test_flush_and_write(self):
1401 raw = self.BytesIO(b"abcdefghi")
1402 bufio = self.tp(raw)
1403
1404 bufio.write(b"123")
1405 bufio.flush()
1406 bufio.write(b"45")
1407 bufio.flush()
1408 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001409 self.assertEqual(b"12345fghi", raw.getvalue())
1410 self.assertEqual(b"12345fghi", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +00001411
1412 def test_threads(self):
1413 BufferedReaderTest.test_threads(self)
1414 BufferedWriterTest.test_threads(self)
1415
1416 def test_writes_and_peek(self):
1417 def _peek(bufio):
1418 bufio.peek(1)
1419 self.check_writes(_peek)
1420 def _peek(bufio):
1421 pos = bufio.tell()
1422 bufio.seek(-1, 1)
1423 bufio.peek(1)
1424 bufio.seek(pos, 0)
1425 self.check_writes(_peek)
1426
1427 def test_writes_and_reads(self):
1428 def _read(bufio):
1429 bufio.seek(-1, 1)
1430 bufio.read(1)
1431 self.check_writes(_read)
1432
1433 def test_writes_and_read1s(self):
1434 def _read1(bufio):
1435 bufio.seek(-1, 1)
1436 bufio.read1(1)
1437 self.check_writes(_read1)
1438
1439 def test_writes_and_readintos(self):
1440 def _read(bufio):
1441 bufio.seek(-1, 1)
1442 bufio.readinto(bytearray(1))
1443 self.check_writes(_read)
1444
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001445 def test_write_after_readahead(self):
1446 # Issue #6629: writing after the buffer was filled by readahead should
1447 # first rewind the raw stream.
1448 for overwrite_size in [1, 5]:
1449 raw = self.BytesIO(b"A" * 10)
1450 bufio = self.tp(raw, 4)
1451 # Trigger readahead
1452 self.assertEqual(bufio.read(1), b"A")
1453 self.assertEqual(bufio.tell(), 1)
1454 # Overwriting should rewind the raw stream if it needs so
1455 bufio.write(b"B" * overwrite_size)
1456 self.assertEqual(bufio.tell(), overwrite_size + 1)
1457 # If the write size was smaller than the buffer size, flush() and
1458 # check that rewind happens.
1459 bufio.flush()
1460 self.assertEqual(bufio.tell(), overwrite_size + 1)
1461 s = raw.getvalue()
1462 self.assertEqual(s,
1463 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1464
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001465 def test_truncate_after_read_or_write(self):
1466 raw = self.BytesIO(b"A" * 10)
1467 bufio = self.tp(raw, 100)
1468 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1469 self.assertEqual(bufio.truncate(), 2)
1470 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1471 self.assertEqual(bufio.truncate(), 4)
1472
Antoine Pitrou19690592009-06-12 20:14:08 +00001473 def test_misbehaved_io(self):
1474 BufferedReaderTest.test_misbehaved_io(self)
1475 BufferedWriterTest.test_misbehaved_io(self)
1476
1477class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest):
1478 tp = io.BufferedRandom
1479
1480 def test_constructor(self):
1481 BufferedRandomTest.test_constructor(self)
1482 # The allocation can succeed on 32-bit builds, e.g. with more
1483 # than 2GB RAM and a 64-bit kernel.
1484 if sys.maxsize > 0x7FFFFFFF:
1485 rawio = self.MockRawIO()
1486 bufio = self.tp(rawio)
1487 self.assertRaises((OverflowError, MemoryError, ValueError),
1488 bufio.__init__, rawio, sys.maxsize)
1489
1490 def test_garbage_collection(self):
1491 CBufferedReaderTest.test_garbage_collection(self)
1492 CBufferedWriterTest.test_garbage_collection(self)
1493
1494class PyBufferedRandomTest(BufferedRandomTest):
1495 tp = pyio.BufferedRandom
1496
1497
Christian Heimes1a6387e2008-03-26 12:49:49 +00001498# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1499# properties:
1500# - A single output character can correspond to many bytes of input.
1501# - The number of input bytes to complete the character can be
1502# undetermined until the last input byte is received.
1503# - The number of input bytes can vary depending on previous input.
1504# - A single input byte can correspond to many characters of output.
1505# - The number of output characters can be undetermined until the
1506# last input byte is received.
1507# - The number of output characters can vary depending on previous input.
1508
1509class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1510 """
1511 For testing seek/tell behavior with a stateful, buffering decoder.
1512
1513 Input is a sequence of words. Words may be fixed-length (length set
1514 by input) or variable-length (period-terminated). In variable-length
1515 mode, extra periods are ignored. Possible words are:
1516 - 'i' followed by a number sets the input length, I (maximum 99).
1517 When I is set to 0, words are space-terminated.
1518 - 'o' followed by a number sets the output length, O (maximum 99).
1519 - Any other word is converted into a word followed by a period on
1520 the output. The output word consists of the input word truncated
1521 or padded out with hyphens to make its length equal to O. If O
1522 is 0, the word is output verbatim without truncating or padding.
1523 I and O are initially set to 1. When I changes, any buffered input is
1524 re-scanned according to the new I. EOF also terminates the last word.
1525 """
1526
1527 def __init__(self, errors='strict'):
1528 codecs.IncrementalDecoder.__init__(self, errors)
1529 self.reset()
1530
1531 def __repr__(self):
1532 return '<SID %x>' % id(self)
1533
1534 def reset(self):
1535 self.i = 1
1536 self.o = 1
1537 self.buffer = bytearray()
1538
1539 def getstate(self):
1540 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1541 return bytes(self.buffer), i*100 + o
1542
1543 def setstate(self, state):
1544 buffer, io = state
1545 self.buffer = bytearray(buffer)
1546 i, o = divmod(io, 100)
1547 self.i, self.o = i ^ 1, o ^ 1
1548
1549 def decode(self, input, final=False):
1550 output = ''
1551 for b in input:
1552 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001553 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001554 if self.buffer:
1555 output += self.process_word()
1556 else:
1557 self.buffer.append(b)
1558 else: # fixed-length, terminate after self.i bytes
1559 self.buffer.append(b)
1560 if len(self.buffer) == self.i:
1561 output += self.process_word()
1562 if final and self.buffer: # EOF terminates the last word
1563 output += self.process_word()
1564 return output
1565
1566 def process_word(self):
1567 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001568 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001569 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001570 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001571 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1572 else:
1573 output = self.buffer.decode('ascii')
1574 if len(output) < self.o:
1575 output += '-'*self.o # pad out with hyphens
1576 if self.o:
1577 output = output[:self.o] # truncate to output length
1578 output += '.'
1579 self.buffer = bytearray()
1580 return output
1581
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001582 codecEnabled = False
1583
1584 @classmethod
1585 def lookupTestDecoder(cls, name):
1586 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001587 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001588 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001589 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001590 incrementalencoder=None,
1591 streamreader=None, streamwriter=None,
1592 incrementaldecoder=cls)
1593
1594# Register the previous decoder for testing.
1595# Disabled by default, tests will enable it.
1596codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1597
1598
Christian Heimes1a6387e2008-03-26 12:49:49 +00001599class StatefulIncrementalDecoderTest(unittest.TestCase):
1600 """
1601 Make sure the StatefulIncrementalDecoder actually works.
1602 """
1603
1604 test_cases = [
1605 # I=1, O=1 (fixed-length input == fixed-length output)
1606 (b'abcd', False, 'a.b.c.d.'),
1607 # I=0, O=0 (variable-length input, variable-length output)
1608 (b'oiabcd', True, 'abcd.'),
1609 # I=0, O=0 (should ignore extra periods)
1610 (b'oi...abcd...', True, 'abcd.'),
1611 # I=0, O=6 (variable-length input, fixed-length output)
1612 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1613 # I=2, O=6 (fixed-length input < fixed-length output)
1614 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1615 # I=6, O=3 (fixed-length input > fixed-length output)
1616 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1617 # I=0, then 3; O=29, then 15 (with longer output)
1618 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1619 'a----------------------------.' +
1620 'b----------------------------.' +
1621 'cde--------------------------.' +
1622 'abcdefghijabcde.' +
1623 'a.b------------.' +
1624 '.c.------------.' +
1625 'd.e------------.' +
1626 'k--------------.' +
1627 'l--------------.' +
1628 'm--------------.')
1629 ]
1630
Antoine Pitrou19690592009-06-12 20:14:08 +00001631 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001632 # Try a few one-shot test cases.
1633 for input, eof, output in self.test_cases:
1634 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001635 self.assertEqual(d.decode(input, eof), output)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001636
1637 # Also test an unfinished decode, followed by forcing EOF.
1638 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001639 self.assertEqual(d.decode(b'oiabcd'), '')
1640 self.assertEqual(d.decode(b'', 1), 'abcd.')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001641
1642class TextIOWrapperTest(unittest.TestCase):
1643
1644 def setUp(self):
1645 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1646 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001647 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001648
1649 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001650 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001651
Antoine Pitrou19690592009-06-12 20:14:08 +00001652 def test_constructor(self):
1653 r = self.BytesIO(b"\xc3\xa9\n\n")
1654 b = self.BufferedReader(r, 1000)
1655 t = self.TextIOWrapper(b)
1656 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti2623a372010-11-21 13:34:58 +00001657 self.assertEqual(t.encoding, "latin1")
1658 self.assertEqual(t.line_buffering, False)
Antoine Pitrou19690592009-06-12 20:14:08 +00001659 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti2623a372010-11-21 13:34:58 +00001660 self.assertEqual(t.encoding, "utf8")
1661 self.assertEqual(t.line_buffering, True)
1662 self.assertEqual("\xe9\n", t.readline())
Antoine Pitrou19690592009-06-12 20:14:08 +00001663 self.assertRaises(TypeError, t.__init__, b, newline=42)
1664 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1665
1666 def test_detach(self):
1667 r = self.BytesIO()
1668 b = self.BufferedWriter(r)
1669 t = self.TextIOWrapper(b)
1670 self.assertIs(t.detach(), b)
1671
1672 t = self.TextIOWrapper(b, encoding="ascii")
1673 t.write("howdy")
1674 self.assertFalse(r.getvalue())
1675 t.detach()
1676 self.assertEqual(r.getvalue(), b"howdy")
1677 self.assertRaises(ValueError, t.detach)
1678
1679 def test_repr(self):
1680 raw = self.BytesIO("hello".encode("utf-8"))
1681 b = self.BufferedReader(raw)
1682 t = self.TextIOWrapper(b, encoding="utf-8")
1683 modname = self.TextIOWrapper.__module__
1684 self.assertEqual(repr(t),
1685 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1686 raw.name = "dummy"
1687 self.assertEqual(repr(t),
1688 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1689 raw.name = b"dummy"
1690 self.assertEqual(repr(t),
1691 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1692
1693 def test_line_buffering(self):
1694 r = self.BytesIO()
1695 b = self.BufferedWriter(r, 1000)
1696 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1697 t.write("X")
Ezio Melotti2623a372010-11-21 13:34:58 +00001698 self.assertEqual(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001699 t.write("Y\nZ")
Ezio Melotti2623a372010-11-21 13:34:58 +00001700 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001701 t.write("A\rB")
Ezio Melotti2623a372010-11-21 13:34:58 +00001702 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001703
Antoine Pitrou19690592009-06-12 20:14:08 +00001704 def test_encoding(self):
1705 # Check the encoding attribute is always set, and valid
1706 b = self.BytesIO()
1707 t = self.TextIOWrapper(b, encoding="utf8")
1708 self.assertEqual(t.encoding, "utf8")
1709 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001710 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00001711 codecs.lookup(t.encoding)
1712
1713 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001714 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001715 b = self.BytesIO(b"abc\n\xff\n")
1716 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001717 self.assertRaises(UnicodeError, t.read)
1718 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001719 b = self.BytesIO(b"abc\n\xff\n")
1720 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001721 self.assertRaises(UnicodeError, t.read)
1722 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001723 b = self.BytesIO(b"abc\n\xff\n")
1724 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti2623a372010-11-21 13:34:58 +00001725 self.assertEqual(t.read(), "abc\n\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001726 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001727 b = self.BytesIO(b"abc\n\xff\n")
1728 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti2623a372010-11-21 13:34:58 +00001729 self.assertEqual(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001730
Antoine Pitrou19690592009-06-12 20:14:08 +00001731 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001732 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001733 b = self.BytesIO()
1734 t = self.TextIOWrapper(b, encoding="ascii")
1735 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001736 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001737 b = self.BytesIO()
1738 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1739 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001740 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001741 b = self.BytesIO()
1742 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001743 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001744 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001745 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001746 self.assertEqual(b.getvalue(), b"abcdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001747 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001748 b = self.BytesIO()
1749 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001750 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001751 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001752 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001753 self.assertEqual(b.getvalue(), b"abc?def\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001754
Antoine Pitrou19690592009-06-12 20:14:08 +00001755 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001756 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1757
1758 tests = [
1759 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1760 [ '', input_lines ],
1761 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1762 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1763 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1764 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001765 encodings = (
1766 'utf-8', 'latin-1',
1767 'utf-16', 'utf-16-le', 'utf-16-be',
1768 'utf-32', 'utf-32-le', 'utf-32-be',
1769 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00001770
1771 # Try a range of buffer sizes to test the case where \r is the last
1772 # character in TextIOWrapper._pending_line.
1773 for encoding in encodings:
1774 # XXX: str.encode() should return bytes
1775 data = bytes(''.join(input_lines).encode(encoding))
1776 for do_reads in (False, True):
1777 for bufsize in range(1, 10):
1778 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001779 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1780 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00001781 encoding=encoding)
1782 if do_reads:
1783 got_lines = []
1784 while True:
1785 c2 = textio.read(2)
1786 if c2 == '':
1787 break
Ezio Melotti2623a372010-11-21 13:34:58 +00001788 self.assertEqual(len(c2), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001789 got_lines.append(c2 + textio.readline())
1790 else:
1791 got_lines = list(textio)
1792
1793 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti2623a372010-11-21 13:34:58 +00001794 self.assertEqual(got_line, exp_line)
1795 self.assertEqual(len(got_lines), len(exp_lines))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001796
Antoine Pitrou19690592009-06-12 20:14:08 +00001797 def test_newlines_input(self):
1798 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001799 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1800 for newline, expected in [
1801 (None, normalized.decode("ascii").splitlines(True)),
1802 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00001803 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1804 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1805 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00001806 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00001807 buf = self.BytesIO(testdata)
1808 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti2623a372010-11-21 13:34:58 +00001809 self.assertEqual(txt.readlines(), expected)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001810 txt.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001811 self.assertEqual(txt.read(), "".join(expected))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001812
Antoine Pitrou19690592009-06-12 20:14:08 +00001813 def test_newlines_output(self):
1814 testdict = {
1815 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1816 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1817 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1818 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1819 }
1820 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1821 for newline, expected in tests:
1822 buf = self.BytesIO()
1823 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1824 txt.write("AAA\nB")
1825 txt.write("BB\nCCC\n")
1826 txt.write("X\rY\r\nZ")
1827 txt.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001828 self.assertEqual(buf.closed, False)
1829 self.assertEqual(buf.getvalue(), expected)
Antoine Pitrou19690592009-06-12 20:14:08 +00001830
1831 def test_destructor(self):
1832 l = []
1833 base = self.BytesIO
1834 class MyBytesIO(base):
1835 def close(self):
1836 l.append(self.getvalue())
1837 base.close(self)
1838 b = MyBytesIO()
1839 t = self.TextIOWrapper(b, encoding="ascii")
1840 t.write("abc")
1841 del t
1842 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001843 self.assertEqual([b"abc"], l)
Antoine Pitrou19690592009-06-12 20:14:08 +00001844
1845 def test_override_destructor(self):
1846 record = []
1847 class MyTextIO(self.TextIOWrapper):
1848 def __del__(self):
1849 record.append(1)
1850 try:
1851 f = super(MyTextIO, self).__del__
1852 except AttributeError:
1853 pass
1854 else:
1855 f()
1856 def close(self):
1857 record.append(2)
1858 super(MyTextIO, self).close()
1859 def flush(self):
1860 record.append(3)
1861 super(MyTextIO, self).flush()
1862 b = self.BytesIO()
1863 t = MyTextIO(b, encoding="ascii")
1864 del t
1865 support.gc_collect()
1866 self.assertEqual(record, [1, 2, 3])
1867
1868 def test_error_through_destructor(self):
1869 # Test that the exception state is not modified by a destructor,
1870 # even if close() fails.
1871 rawio = self.CloseFailureIO()
1872 def f():
1873 self.TextIOWrapper(rawio).xyzzy
1874 with support.captured_output("stderr") as s:
1875 self.assertRaises(AttributeError, f)
1876 s = s.getvalue().strip()
1877 if s:
1878 # The destructor *may* have printed an unraisable error, check it
1879 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001880 self.assertTrue(s.startswith("Exception IOError: "), s)
1881 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001882
1883 # Systematic tests of the text I/O API
1884
Antoine Pitrou19690592009-06-12 20:14:08 +00001885 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001886 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1887 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00001888 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001889 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00001890 self.assertEqual(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001891 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001892 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001893 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00001894 self.assertEqual(f.tell(), 0)
1895 self.assertEqual(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001896 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00001897 self.assertEqual(f.seek(0), 0)
1898 self.assertEqual(f.read(None), "abc")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001899 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001900 self.assertEqual(f.read(2), "ab")
1901 self.assertEqual(f.read(1), "c")
1902 self.assertEqual(f.read(1), "")
1903 self.assertEqual(f.read(), "")
1904 self.assertEqual(f.tell(), cookie)
1905 self.assertEqual(f.seek(0), 0)
1906 self.assertEqual(f.seek(0, 2), cookie)
1907 self.assertEqual(f.write("def"), 3)
1908 self.assertEqual(f.seek(cookie), cookie)
1909 self.assertEqual(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001910 if enc.startswith("utf"):
1911 self.multi_line_test(f, enc)
1912 f.close()
1913
1914 def multi_line_test(self, f, enc):
1915 f.seek(0)
1916 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00001917 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001918 wlines = []
1919 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
1920 chars = []
1921 for i in range(size):
1922 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00001923 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001924 wlines.append((f.tell(), line))
1925 f.write(line)
1926 f.seek(0)
1927 rlines = []
1928 while True:
1929 pos = f.tell()
1930 line = f.readline()
1931 if not line:
1932 break
1933 rlines.append((pos, line))
Ezio Melotti2623a372010-11-21 13:34:58 +00001934 self.assertEqual(rlines, wlines)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001935
Antoine Pitrou19690592009-06-12 20:14:08 +00001936 def test_telling(self):
1937 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001938 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001939 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001940 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001941 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001942 p2 = f.tell()
1943 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001944 self.assertEqual(f.tell(), p0)
1945 self.assertEqual(f.readline(), "\xff\n")
1946 self.assertEqual(f.tell(), p1)
1947 self.assertEqual(f.readline(), "\xff\n")
1948 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001949 f.seek(0)
1950 for line in f:
Ezio Melotti2623a372010-11-21 13:34:58 +00001951 self.assertEqual(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001952 self.assertRaises(IOError, f.tell)
Ezio Melotti2623a372010-11-21 13:34:58 +00001953 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001954 f.close()
1955
Antoine Pitrou19690592009-06-12 20:14:08 +00001956 def test_seeking(self):
1957 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00001958 prefix_size = chunk_size - 2
1959 u_prefix = "a" * prefix_size
1960 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti2623a372010-11-21 13:34:58 +00001961 self.assertEqual(len(u_prefix), len(prefix))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001962 u_suffix = "\u8888\n"
1963 suffix = bytes(u_suffix.encode("utf-8"))
1964 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00001965 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001966 f.write(line*2)
1967 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001968 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001969 s = f.read(prefix_size)
Ezio Melotti2623a372010-11-21 13:34:58 +00001970 self.assertEqual(s, prefix.decode("ascii"))
1971 self.assertEqual(f.tell(), prefix_size)
1972 self.assertEqual(f.readline(), u_suffix)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001973
Antoine Pitrou19690592009-06-12 20:14:08 +00001974 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001975 # Regression test for a specific bug
1976 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00001977 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001978 f.write(data)
1979 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001980 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001981 f._CHUNK_SIZE # Just test that it exists
1982 f._CHUNK_SIZE = 2
1983 f.readline()
1984 f.tell()
1985
Antoine Pitrou19690592009-06-12 20:14:08 +00001986 def test_seek_and_tell(self):
1987 #Test seek/tell using the StatefulIncrementalDecoder.
1988 # Make test faster by doing smaller seeks
1989 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00001990
Antoine Pitrou19690592009-06-12 20:14:08 +00001991 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001992 """Tell/seek to various points within a data stream and ensure
1993 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00001994 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001995 f.write(data)
1996 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001997 f = self.open(support.TESTFN, encoding='test_decoder')
1998 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00001999 decoded = f.read()
2000 f.close()
2001
2002 for i in range(min_pos, len(decoded) + 1): # seek positions
2003 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00002004 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti2623a372010-11-21 13:34:58 +00002005 self.assertEqual(f.read(i), decoded[:i])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002006 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002007 self.assertEqual(f.read(j), decoded[i:i + j])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002008 f.seek(cookie)
Ezio Melotti2623a372010-11-21 13:34:58 +00002009 self.assertEqual(f.read(), decoded[i:])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002010 f.close()
2011
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002012 # Enable the test decoder.
2013 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002014
2015 # Run the tests.
2016 try:
2017 # Try each test case.
2018 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002019 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002020
2021 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002022 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2023 offset = CHUNK_SIZE - len(input)//2
2024 prefix = b'.'*offset
2025 # Don't bother seeking into the prefix (takes too long).
2026 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002027 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002028
2029 # Ensure our test decoder won't interfere with subsequent tests.
2030 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002031 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002032
Antoine Pitrou19690592009-06-12 20:14:08 +00002033 def test_encoded_writes(self):
2034 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002035 tests = ("utf-16",
2036 "utf-16-le",
2037 "utf-16-be",
2038 "utf-32",
2039 "utf-32-le",
2040 "utf-32-be")
2041 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002042 buf = self.BytesIO()
2043 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002044 # Check if the BOM is written only once (see issue1753).
2045 f.write(data)
2046 f.write(data)
2047 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002048 self.assertEqual(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002049 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002050 self.assertEqual(f.read(), data * 2)
2051 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002052
Antoine Pitrou19690592009-06-12 20:14:08 +00002053 def test_unreadable(self):
2054 class UnReadable(self.BytesIO):
2055 def readable(self):
2056 return False
2057 txt = self.TextIOWrapper(UnReadable())
2058 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002059
Antoine Pitrou19690592009-06-12 20:14:08 +00002060 def test_read_one_by_one(self):
2061 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002062 reads = ""
2063 while True:
2064 c = txt.read(1)
2065 if not c:
2066 break
2067 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002068 self.assertEqual(reads, "AA\nBB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002069
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002070 def test_readlines(self):
2071 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2072 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2073 txt.seek(0)
2074 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2075 txt.seek(0)
2076 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2077
Christian Heimes1a6387e2008-03-26 12:49:49 +00002078 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002079 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002080 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002081 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002082 reads = ""
2083 while True:
2084 c = txt.read(128)
2085 if not c:
2086 break
2087 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002088 self.assertEqual(reads, "A"*127+"\nB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002089
2090 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002091 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002092
2093 # read one char at a time
2094 reads = ""
2095 while True:
2096 c = txt.read(1)
2097 if not c:
2098 break
2099 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002100 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002101
2102 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002103 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002104 txt._CHUNK_SIZE = 4
2105
2106 reads = ""
2107 while True:
2108 c = txt.read(4)
2109 if not c:
2110 break
2111 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002112 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002113
2114 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002115 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002116 txt._CHUNK_SIZE = 4
2117
2118 reads = txt.read(4)
2119 reads += txt.read(4)
2120 reads += txt.readline()
2121 reads += txt.readline()
2122 reads += txt.readline()
Ezio Melotti2623a372010-11-21 13:34:58 +00002123 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002124
2125 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002126 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002127 txt._CHUNK_SIZE = 4
2128
2129 reads = txt.read(4)
2130 reads += txt.read()
Ezio Melotti2623a372010-11-21 13:34:58 +00002131 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002132
2133 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002134 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002135 txt._CHUNK_SIZE = 4
2136
2137 reads = txt.read(4)
2138 pos = txt.tell()
2139 txt.seek(0)
2140 txt.seek(pos)
Ezio Melotti2623a372010-11-21 13:34:58 +00002141 self.assertEqual(txt.read(4), "BBB\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002142
2143 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002144 buffer = self.BytesIO(self.testdata)
2145 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002146
2147 self.assertEqual(buffer.seekable(), txt.seekable())
2148
Antoine Pitrou19690592009-06-12 20:14:08 +00002149 def test_append_bom(self):
2150 # The BOM is not written again when appending to a non-empty file
2151 filename = support.TESTFN
2152 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2153 with self.open(filename, 'w', encoding=charset) as f:
2154 f.write('aaa')
2155 pos = f.tell()
2156 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002157 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002158
2159 with self.open(filename, 'a', encoding=charset) as f:
2160 f.write('xxx')
2161 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002162 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002163
Antoine Pitrou19690592009-06-12 20:14:08 +00002164 def test_seek_bom(self):
2165 # Same test, but when seeking manually
2166 filename = support.TESTFN
2167 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2168 with self.open(filename, 'w', encoding=charset) as f:
2169 f.write('aaa')
2170 pos = f.tell()
2171 with self.open(filename, 'r+', encoding=charset) as f:
2172 f.seek(pos)
2173 f.write('zzz')
2174 f.seek(0)
2175 f.write('bbb')
2176 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002177 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002178
2179 def test_errors_property(self):
2180 with self.open(support.TESTFN, "w") as f:
2181 self.assertEqual(f.errors, "strict")
2182 with self.open(support.TESTFN, "w", errors="replace") as f:
2183 self.assertEqual(f.errors, "replace")
2184
Victor Stinner6a102812010-04-27 23:55:59 +00002185 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002186 def test_threads_write(self):
2187 # Issue6750: concurrent writes could duplicate data
2188 event = threading.Event()
2189 with self.open(support.TESTFN, "w", buffering=1) as f:
2190 def run(n):
2191 text = "Thread%03d\n" % n
2192 event.wait()
2193 f.write(text)
2194 threads = [threading.Thread(target=lambda n=x: run(n))
2195 for x in range(20)]
2196 for t in threads:
2197 t.start()
2198 time.sleep(0.02)
2199 event.set()
2200 for t in threads:
2201 t.join()
2202 with self.open(support.TESTFN) as f:
2203 content = f.read()
2204 for n in range(20):
Ezio Melotti2623a372010-11-21 13:34:58 +00002205 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002206
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002207 def test_flush_error_on_close(self):
2208 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2209 def bad_flush():
2210 raise IOError()
2211 txt.flush = bad_flush
2212 self.assertRaises(IOError, txt.close) # exception not swallowed
2213
2214 def test_multi_close(self):
2215 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2216 txt.close()
2217 txt.close()
2218 txt.close()
2219 self.assertRaises(ValueError, txt.flush)
2220
Antoine Pitroufc9ead62010-12-21 21:26:55 +00002221 def test_readonly_attributes(self):
2222 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2223 buf = self.BytesIO(self.testdata)
2224 with self.assertRaises((AttributeError, TypeError)):
2225 txt.buffer = buf
2226
Antoine Pitrou19690592009-06-12 20:14:08 +00002227class CTextIOWrapperTest(TextIOWrapperTest):
2228
2229 def test_initialization(self):
2230 r = self.BytesIO(b"\xc3\xa9\n\n")
2231 b = self.BufferedReader(r, 1000)
2232 t = self.TextIOWrapper(b)
2233 self.assertRaises(TypeError, t.__init__, b, newline=42)
2234 self.assertRaises(ValueError, t.read)
2235 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2236 self.assertRaises(ValueError, t.read)
2237
2238 def test_garbage_collection(self):
2239 # C TextIOWrapper objects are collected, and collecting them flushes
2240 # all data to disk.
2241 # The Python version has __del__, so it ends in gc.garbage instead.
2242 rawio = io.FileIO(support.TESTFN, "wb")
2243 b = self.BufferedWriter(rawio)
2244 t = self.TextIOWrapper(b, encoding="ascii")
2245 t.write("456def")
2246 t.x = t
2247 wr = weakref.ref(t)
2248 del t
2249 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002250 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002251 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002252 self.assertEqual(f.read(), b"456def")
2253
2254class PyTextIOWrapperTest(TextIOWrapperTest):
2255 pass
2256
2257
2258class IncrementalNewlineDecoderTest(unittest.TestCase):
2259
2260 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002261 # UTF-8 specific tests for a newline decoder
2262 def _check_decode(b, s, **kwargs):
2263 # We exercise getstate() / setstate() as well as decode()
2264 state = decoder.getstate()
Ezio Melotti2623a372010-11-21 13:34:58 +00002265 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002266 decoder.setstate(state)
Ezio Melotti2623a372010-11-21 13:34:58 +00002267 self.assertEqual(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002268
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002269 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002270
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002271 _check_decode(b'\xe8', "")
2272 _check_decode(b'\xa2', "")
2273 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002274
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002275 _check_decode(b'\xe8', "")
2276 _check_decode(b'\xa2', "")
2277 _check_decode(b'\x88', "\u8888")
2278
2279 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002280 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2281
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002282 decoder.reset()
2283 _check_decode(b'\n', "\n")
2284 _check_decode(b'\r', "")
2285 _check_decode(b'', "\n", final=True)
2286 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002287
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002288 _check_decode(b'\r', "")
2289 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002290
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002291 _check_decode(b'\r\r\n', "\n\n")
2292 _check_decode(b'\r', "")
2293 _check_decode(b'\r', "\n")
2294 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002295
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002296 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2297 _check_decode(b'\xe8\xa2\x88', "\u8888")
2298 _check_decode(b'\n', "\n")
2299 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2300 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002301
Antoine Pitrou19690592009-06-12 20:14:08 +00002302 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002303 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002304 if encoding is not None:
2305 encoder = codecs.getincrementalencoder(encoding)()
2306 def _decode_bytewise(s):
2307 # Decode one byte at a time
2308 for b in encoder.encode(s):
2309 result.append(decoder.decode(b))
2310 else:
2311 encoder = None
2312 def _decode_bytewise(s):
2313 # Decode one char at a time
2314 for c in s:
2315 result.append(decoder.decode(c))
Ezio Melotti2623a372010-11-21 13:34:58 +00002316 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002317 _decode_bytewise("abc\n\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002318 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002319 _decode_bytewise("\nabc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002320 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002321 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002322 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002323 _decode_bytewise("abc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002324 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002325 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002326 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002327 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002328 input = "abc"
2329 if encoder is not None:
2330 encoder.reset()
2331 input = encoder.encode(input)
Ezio Melotti2623a372010-11-21 13:34:58 +00002332 self.assertEqual(decoder.decode(input), "abc")
2333 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002334
2335 def test_newline_decoder(self):
2336 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002337 # None meaning the IncrementalNewlineDecoder takes unicode input
2338 # rather than bytes input
2339 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002340 'utf-16', 'utf-16-le', 'utf-16-be',
2341 'utf-32', 'utf-32-le', 'utf-32-be',
2342 )
2343 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002344 decoder = enc and codecs.getincrementaldecoder(enc)()
2345 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2346 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002347 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002348 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2349 self.check_newline_decoding_utf8(decoder)
2350
2351 def test_newline_bytes(self):
2352 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2353 def _check(dec):
Ezio Melotti2623a372010-11-21 13:34:58 +00002354 self.assertEqual(dec.newlines, None)
2355 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2356 self.assertEqual(dec.newlines, None)
2357 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2358 self.assertEqual(dec.newlines, None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002359 dec = self.IncrementalNewlineDecoder(None, translate=False)
2360 _check(dec)
2361 dec = self.IncrementalNewlineDecoder(None, translate=True)
2362 _check(dec)
2363
2364class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2365 pass
2366
2367class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2368 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002369
Christian Heimes1a6387e2008-03-26 12:49:49 +00002370
2371# XXX Tests for open()
2372
2373class MiscIOTest(unittest.TestCase):
2374
Benjamin Petersonad100c32008-11-20 22:06:22 +00002375 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002376 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002377
Antoine Pitrou19690592009-06-12 20:14:08 +00002378 def test___all__(self):
2379 for name in self.io.__all__:
2380 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002381 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002382 if name == "open":
2383 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002384 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002385 self.assertTrue(issubclass(obj, Exception), name)
2386 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002387 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002388
Benjamin Petersonad100c32008-11-20 22:06:22 +00002389 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002390 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002391 self.assertEqual(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002392 f.close()
2393
Antoine Pitrou19690592009-06-12 20:14:08 +00002394 f = self.open(support.TESTFN, "U")
Ezio Melotti2623a372010-11-21 13:34:58 +00002395 self.assertEqual(f.name, support.TESTFN)
2396 self.assertEqual(f.buffer.name, support.TESTFN)
2397 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2398 self.assertEqual(f.mode, "U")
2399 self.assertEqual(f.buffer.mode, "rb")
2400 self.assertEqual(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002401 f.close()
2402
Antoine Pitrou19690592009-06-12 20:14:08 +00002403 f = self.open(support.TESTFN, "w+")
Ezio Melotti2623a372010-11-21 13:34:58 +00002404 self.assertEqual(f.mode, "w+")
2405 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2406 self.assertEqual(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002407
Antoine Pitrou19690592009-06-12 20:14:08 +00002408 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti2623a372010-11-21 13:34:58 +00002409 self.assertEqual(g.mode, "wb")
2410 self.assertEqual(g.raw.mode, "wb")
2411 self.assertEqual(g.name, f.fileno())
2412 self.assertEqual(g.raw.name, f.fileno())
Benjamin Petersonad100c32008-11-20 22:06:22 +00002413 f.close()
2414 g.close()
2415
Antoine Pitrou19690592009-06-12 20:14:08 +00002416 def test_io_after_close(self):
2417 for kwargs in [
2418 {"mode": "w"},
2419 {"mode": "wb"},
2420 {"mode": "w", "buffering": 1},
2421 {"mode": "w", "buffering": 2},
2422 {"mode": "wb", "buffering": 0},
2423 {"mode": "r"},
2424 {"mode": "rb"},
2425 {"mode": "r", "buffering": 1},
2426 {"mode": "r", "buffering": 2},
2427 {"mode": "rb", "buffering": 0},
2428 {"mode": "w+"},
2429 {"mode": "w+b"},
2430 {"mode": "w+", "buffering": 1},
2431 {"mode": "w+", "buffering": 2},
2432 {"mode": "w+b", "buffering": 0},
2433 ]:
2434 f = self.open(support.TESTFN, **kwargs)
2435 f.close()
2436 self.assertRaises(ValueError, f.flush)
2437 self.assertRaises(ValueError, f.fileno)
2438 self.assertRaises(ValueError, f.isatty)
2439 self.assertRaises(ValueError, f.__iter__)
2440 if hasattr(f, "peek"):
2441 self.assertRaises(ValueError, f.peek, 1)
2442 self.assertRaises(ValueError, f.read)
2443 if hasattr(f, "read1"):
2444 self.assertRaises(ValueError, f.read1, 1024)
2445 if hasattr(f, "readinto"):
2446 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2447 self.assertRaises(ValueError, f.readline)
2448 self.assertRaises(ValueError, f.readlines)
2449 self.assertRaises(ValueError, f.seek, 0)
2450 self.assertRaises(ValueError, f.tell)
2451 self.assertRaises(ValueError, f.truncate)
2452 self.assertRaises(ValueError, f.write,
2453 b"" if "b" in kwargs['mode'] else "")
2454 self.assertRaises(ValueError, f.writelines, [])
2455 self.assertRaises(ValueError, next, f)
2456
2457 def test_blockingioerror(self):
2458 # Various BlockingIOError issues
2459 self.assertRaises(TypeError, self.BlockingIOError)
2460 self.assertRaises(TypeError, self.BlockingIOError, 1)
2461 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2462 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2463 b = self.BlockingIOError(1, "")
2464 self.assertEqual(b.characters_written, 0)
2465 class C(unicode):
2466 pass
2467 c = C("")
2468 b = self.BlockingIOError(1, c)
2469 c.b = b
2470 b.c = c
2471 wr = weakref.ref(c)
2472 del c, b
2473 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002474 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002475
2476 def test_abcs(self):
2477 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002478 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2479 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2480 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2481 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002482
2483 def _check_abc_inheritance(self, abcmodule):
2484 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002485 self.assertIsInstance(f, abcmodule.IOBase)
2486 self.assertIsInstance(f, abcmodule.RawIOBase)
2487 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2488 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002489 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002490 self.assertIsInstance(f, abcmodule.IOBase)
2491 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2492 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2493 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002494 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002495 self.assertIsInstance(f, abcmodule.IOBase)
2496 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2497 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2498 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002499
2500 def test_abc_inheritance(self):
2501 # Test implementations inherit from their respective ABCs
2502 self._check_abc_inheritance(self)
2503
2504 def test_abc_inheritance_official(self):
2505 # Test implementations inherit from the official ABCs of the
2506 # baseline "io" module.
2507 self._check_abc_inheritance(io)
2508
2509class CMiscIOTest(MiscIOTest):
2510 io = io
2511
2512class PyMiscIOTest(MiscIOTest):
2513 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002514
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002515
2516@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2517class SignalsTest(unittest.TestCase):
2518
2519 def setUp(self):
2520 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2521
2522 def tearDown(self):
2523 signal.signal(signal.SIGALRM, self.oldalrm)
2524
2525 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00002526 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002527
2528 @unittest.skipUnless(threading, 'Threading required for this test.')
2529 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2530 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou6439c002011-02-25 21:35:47 +00002531 invokes the signal handler, and bubbles up the exception raised
2532 in the latter."""
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002533 read_results = []
2534 def _read():
2535 s = os.read(r, 1)
2536 read_results.append(s)
2537 t = threading.Thread(target=_read)
2538 t.daemon = True
2539 r, w = os.pipe()
2540 try:
2541 wio = self.io.open(w, **fdopen_kwargs)
2542 t.start()
2543 signal.alarm(1)
2544 # Fill the pipe enough that the write will be blocking.
2545 # It will be interrupted by the timer armed above. Since the
2546 # other thread has read one byte, the low-level write will
2547 # return with a successful (partial) result rather than an EINTR.
2548 # The buffered IO layer must check for pending signal
2549 # handlers, which in this case will invoke alarm_interrupt().
2550 self.assertRaises(ZeroDivisionError,
2551 wio.write, item * (1024 * 1024))
2552 t.join()
2553 # We got one byte, get another one and check that it isn't a
2554 # repeat of the first one.
2555 read_results.append(os.read(r, 1))
2556 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2557 finally:
2558 os.close(w)
2559 os.close(r)
2560 # This is deliberate. If we didn't close the file descriptor
2561 # before closing wio, wio would try to flush its internal
2562 # buffer, and block again.
2563 try:
2564 wio.close()
2565 except IOError as e:
2566 if e.errno != errno.EBADF:
2567 raise
2568
2569 def test_interrupted_write_unbuffered(self):
2570 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2571
2572 def test_interrupted_write_buffered(self):
2573 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2574
2575 def test_interrupted_write_text(self):
2576 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2577
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002578 def check_reentrant_write(self, data, **fdopen_kwargs):
2579 def on_alarm(*args):
2580 # Will be called reentrantly from the same thread
2581 wio.write(data)
2582 1/0
2583 signal.signal(signal.SIGALRM, on_alarm)
2584 r, w = os.pipe()
2585 wio = self.io.open(w, **fdopen_kwargs)
2586 try:
2587 signal.alarm(1)
2588 # Either the reentrant call to wio.write() fails with RuntimeError,
2589 # or the signal handler raises ZeroDivisionError.
2590 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2591 while 1:
2592 for i in range(100):
2593 wio.write(data)
2594 wio.flush()
2595 # Make sure the buffer doesn't fill up and block further writes
2596 os.read(r, len(data) * 100)
2597 exc = cm.exception
2598 if isinstance(exc, RuntimeError):
2599 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2600 finally:
2601 wio.close()
2602 os.close(r)
2603
2604 def test_reentrant_write_buffered(self):
2605 self.check_reentrant_write(b"xy", mode="wb")
2606
2607 def test_reentrant_write_text(self):
2608 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2609
Antoine Pitrou6439c002011-02-25 21:35:47 +00002610 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2611 """Check that a buffered read, when it gets interrupted (either
2612 returning a partial result or EINTR), properly invokes the signal
2613 handler and retries if the latter returned successfully."""
2614 r, w = os.pipe()
2615 fdopen_kwargs["closefd"] = False
2616 def alarm_handler(sig, frame):
2617 os.write(w, b"bar")
2618 signal.signal(signal.SIGALRM, alarm_handler)
2619 try:
2620 rio = self.io.open(r, **fdopen_kwargs)
2621 os.write(w, b"foo")
2622 signal.alarm(1)
2623 # Expected behaviour:
2624 # - first raw read() returns partial b"foo"
2625 # - second raw read() returns EINTR
2626 # - third raw read() returns b"bar"
2627 self.assertEqual(decode(rio.read(6)), "foobar")
2628 finally:
2629 rio.close()
2630 os.close(w)
2631 os.close(r)
2632
2633 def test_interrupterd_read_retry_buffered(self):
2634 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2635 mode="rb")
2636
2637 def test_interrupterd_read_retry_text(self):
2638 self.check_interrupted_read_retry(lambda x: x,
2639 mode="r")
2640
2641 @unittest.skipUnless(threading, 'Threading required for this test.')
2642 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2643 """Check that a buffered write, when it gets interrupted (either
2644 returning a partial result or EINTR), properly invokes the signal
2645 handler and retries if the latter returned successfully."""
2646 select = support.import_module("select")
2647 # A quantity that exceeds the buffer size of an anonymous pipe's
2648 # write end.
2649 N = 1024 * 1024
2650 r, w = os.pipe()
2651 fdopen_kwargs["closefd"] = False
2652 # We need a separate thread to read from the pipe and allow the
2653 # write() to finish. This thread is started after the SIGALRM is
2654 # received (forcing a first EINTR in write()).
2655 read_results = []
2656 write_finished = False
2657 def _read():
2658 while not write_finished:
2659 while r in select.select([r], [], [], 1.0)[0]:
2660 s = os.read(r, 1024)
2661 read_results.append(s)
2662 t = threading.Thread(target=_read)
2663 t.daemon = True
2664 def alarm1(sig, frame):
2665 signal.signal(signal.SIGALRM, alarm2)
2666 signal.alarm(1)
2667 def alarm2(sig, frame):
2668 t.start()
2669 signal.signal(signal.SIGALRM, alarm1)
2670 try:
2671 wio = self.io.open(w, **fdopen_kwargs)
2672 signal.alarm(1)
2673 # Expected behaviour:
2674 # - first raw write() is partial (because of the limited pipe buffer
2675 # and the first alarm)
2676 # - second raw write() returns EINTR (because of the second alarm)
2677 # - subsequent write()s are successful (either partial or complete)
2678 self.assertEqual(N, wio.write(item * N))
2679 wio.flush()
2680 write_finished = True
2681 t.join()
2682 self.assertEqual(N, sum(len(x) for x in read_results))
2683 finally:
2684 write_finished = True
2685 os.close(w)
2686 os.close(r)
2687 # This is deliberate. If we didn't close the file descriptor
2688 # before closing wio, wio would try to flush its internal
2689 # buffer, and could block (in case of failure).
2690 try:
2691 wio.close()
2692 except IOError as e:
2693 if e.errno != errno.EBADF:
2694 raise
2695
2696 def test_interrupterd_write_retry_buffered(self):
2697 self.check_interrupted_write_retry(b"x", mode="wb")
2698
2699 def test_interrupterd_write_retry_text(self):
2700 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
2701
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002702
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002703class CSignalsTest(SignalsTest):
2704 io = io
2705
2706class PySignalsTest(SignalsTest):
2707 io = pyio
2708
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002709 # Handling reentrancy issues would slow down _pyio even more, so the
2710 # tests are disabled.
2711 test_reentrant_write_buffered = None
2712 test_reentrant_write_text = None
2713
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002714
Christian Heimes1a6387e2008-03-26 12:49:49 +00002715def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00002716 tests = (CIOTest, PyIOTest,
2717 CBufferedReaderTest, PyBufferedReaderTest,
2718 CBufferedWriterTest, PyBufferedWriterTest,
2719 CBufferedRWPairTest, PyBufferedRWPairTest,
2720 CBufferedRandomTest, PyBufferedRandomTest,
2721 StatefulIncrementalDecoderTest,
2722 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2723 CTextIOWrapperTest, PyTextIOWrapperTest,
2724 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002725 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00002726 )
2727
2728 # Put the namespaces of the IO module we are testing and some useful mock
2729 # classes in the __dict__ of each test.
2730 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00002731 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00002732 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2733 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
2734 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
2735 globs = globals()
2736 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2737 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2738 # Avoid turning open into a bound method.
2739 py_io_ns["open"] = pyio.OpenWrapper
2740 for test in tests:
2741 if test.__name__.startswith("C"):
2742 for name, obj in c_io_ns.items():
2743 setattr(test, name, obj)
2744 elif test.__name__.startswith("Py"):
2745 for name, obj in py_io_ns.items():
2746 setattr(test, name, obj)
2747
2748 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002749
2750if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00002751 test_main()