blob: 8af8a6462247e96f3cffb2f3685aae61f088663b [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Antoine Pitrou19690592009-06-12 20:14:08 +000032import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000033import signal
34import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000035from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000036from collections import deque
37from test import test_support as support
Christian Heimes1a6387e2008-03-26 12:49:49 +000038
39import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000040import io # C implementation of io
41import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000042try:
43 import threading
44except ImportError:
45 threading = None
Antoine Pitrou19690592009-06-12 20:14:08 +000046
47__metaclass__ = type
48bytes = support.py3k_bytes
49
50def _default_chunk_size():
51 """Get the default TextIOWrapper chunk size"""
52 with io.open(__file__, "r", encoding="latin1") as f:
53 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000054
55
Antoine Pitrou6391b342010-09-14 18:48:19 +000056class MockRawIOWithoutRead:
57 """A RawIO implementation without read(), so as to exercise the default
58 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000059
60 def __init__(self, read_stack=()):
61 self._read_stack = list(read_stack)
62 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000063 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000064 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000065
Christian Heimes1a6387e2008-03-26 12:49:49 +000066 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000067 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000068 return len(b)
69
70 def writable(self):
71 return True
72
73 def fileno(self):
74 return 42
75
76 def readable(self):
77 return True
78
79 def seekable(self):
80 return True
81
82 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000083 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000084
85 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000086 return 0 # same comment as above
87
88 def readinto(self, buf):
89 self._reads += 1
90 max_len = len(buf)
91 try:
92 data = self._read_stack[0]
93 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000094 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +000095 return 0
96 if data is None:
97 del self._read_stack[0]
98 return None
99 n = len(data)
100 if len(data) <= max_len:
101 del self._read_stack[0]
102 buf[:n] = data
103 return n
104 else:
105 buf[:] = data[:max_len]
106 self._read_stack[0] = data[max_len:]
107 return max_len
108
109 def truncate(self, pos=None):
110 return pos
111
Antoine Pitrou6391b342010-09-14 18:48:19 +0000112class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
113 pass
114
115class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
116 pass
117
118
119class MockRawIO(MockRawIOWithoutRead):
120
121 def read(self, n=None):
122 self._reads += 1
123 try:
124 return self._read_stack.pop(0)
125 except:
126 self._extraneous_reads += 1
127 return b""
128
Antoine Pitrou19690592009-06-12 20:14:08 +0000129class CMockRawIO(MockRawIO, io.RawIOBase):
130 pass
131
132class PyMockRawIO(MockRawIO, pyio.RawIOBase):
133 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000134
135
Antoine Pitrou19690592009-06-12 20:14:08 +0000136class MisbehavedRawIO(MockRawIO):
137 def write(self, b):
138 return MockRawIO.write(self, b) * 2
139
140 def read(self, n=None):
141 return MockRawIO.read(self, n) * 2
142
143 def seek(self, pos, whence):
144 return -123
145
146 def tell(self):
147 return -456
148
149 def readinto(self, buf):
150 MockRawIO.readinto(self, buf)
151 return len(buf) * 5
152
153class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
154 pass
155
156class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
157 pass
158
159
160class CloseFailureIO(MockRawIO):
161 closed = 0
162
163 def close(self):
164 if not self.closed:
165 self.closed = 1
166 raise IOError
167
168class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
169 pass
170
171class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
172 pass
173
174
175class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000176
177 def __init__(self, data):
178 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000179 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000180
181 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000182 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000183 self.read_history.append(None if res is None else len(res))
184 return res
185
Antoine Pitrou19690592009-06-12 20:14:08 +0000186 def readinto(self, b):
187 res = super(MockFileIO, self).readinto(b)
188 self.read_history.append(res)
189 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000190
Antoine Pitrou19690592009-06-12 20:14:08 +0000191class CMockFileIO(MockFileIO, io.BytesIO):
192 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000193
Antoine Pitrou19690592009-06-12 20:14:08 +0000194class PyMockFileIO(MockFileIO, pyio.BytesIO):
195 pass
196
197
198class MockNonBlockWriterIO:
199
200 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000201 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000202 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000203
Antoine Pitrou19690592009-06-12 20:14:08 +0000204 def pop_written(self):
205 s = b"".join(self._write_stack)
206 self._write_stack[:] = []
207 return s
208
209 def block_on(self, char):
210 """Block when a given char is encountered."""
211 self._blocker_char = char
212
213 def readable(self):
214 return True
215
216 def seekable(self):
217 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000218
219 def writable(self):
220 return True
221
Antoine Pitrou19690592009-06-12 20:14:08 +0000222 def write(self, b):
223 b = bytes(b)
224 n = -1
225 if self._blocker_char:
226 try:
227 n = b.index(self._blocker_char)
228 except ValueError:
229 pass
230 else:
231 self._blocker_char = None
232 self._write_stack.append(b[:n])
233 raise self.BlockingIOError(0, "test blocking", n)
234 self._write_stack.append(b)
235 return len(b)
236
237class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
238 BlockingIOError = io.BlockingIOError
239
240class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
241 BlockingIOError = pyio.BlockingIOError
242
Christian Heimes1a6387e2008-03-26 12:49:49 +0000243
244class IOTest(unittest.TestCase):
245
Antoine Pitrou19690592009-06-12 20:14:08 +0000246 def setUp(self):
247 support.unlink(support.TESTFN)
248
Christian Heimes1a6387e2008-03-26 12:49:49 +0000249 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000250 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000251
252 def write_ops(self, f):
253 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000254 f.truncate(0)
255 self.assertEqual(f.tell(), 5)
256 f.seek(0)
257
258 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000259 self.assertEqual(f.seek(0), 0)
260 self.assertEqual(f.write(b"Hello."), 6)
261 self.assertEqual(f.tell(), 6)
262 self.assertEqual(f.seek(-1, 1), 5)
263 self.assertEqual(f.tell(), 5)
264 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
265 self.assertEqual(f.seek(0), 0)
266 self.assertEqual(f.write(b"h"), 1)
267 self.assertEqual(f.seek(-1, 2), 13)
268 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000269
Christian Heimes1a6387e2008-03-26 12:49:49 +0000270 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000271 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000272 self.assertRaises(TypeError, f.seek, 0.0)
273
274 def read_ops(self, f, buffered=False):
275 data = f.read(5)
276 self.assertEqual(data, b"hello")
277 data = bytearray(data)
278 self.assertEqual(f.readinto(data), 5)
279 self.assertEqual(data, b" worl")
280 self.assertEqual(f.readinto(data), 2)
281 self.assertEqual(len(data), 5)
282 self.assertEqual(data[:2], b"d\n")
283 self.assertEqual(f.seek(0), 0)
284 self.assertEqual(f.read(20), b"hello world\n")
285 self.assertEqual(f.read(1), b"")
286 self.assertEqual(f.readinto(bytearray(b"x")), 0)
287 self.assertEqual(f.seek(-6, 2), 6)
288 self.assertEqual(f.read(5), b"world")
289 self.assertEqual(f.read(0), b"")
290 self.assertEqual(f.readinto(bytearray()), 0)
291 self.assertEqual(f.seek(-6, 1), 5)
292 self.assertEqual(f.read(5), b" worl")
293 self.assertEqual(f.tell(), 10)
294 self.assertRaises(TypeError, f.seek, 0.0)
295 if buffered:
296 f.seek(0)
297 self.assertEqual(f.read(), b"hello world\n")
298 f.seek(6)
299 self.assertEqual(f.read(), b"world\n")
300 self.assertEqual(f.read(), b"")
301
302 LARGE = 2**31
303
304 def large_file_ops(self, f):
305 assert f.readable()
306 assert f.writable()
307 self.assertEqual(f.seek(self.LARGE), self.LARGE)
308 self.assertEqual(f.tell(), self.LARGE)
309 self.assertEqual(f.write(b"xxx"), 3)
310 self.assertEqual(f.tell(), self.LARGE + 3)
311 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
312 self.assertEqual(f.truncate(), self.LARGE + 2)
313 self.assertEqual(f.tell(), self.LARGE + 2)
314 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
315 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000316 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000317 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
318 self.assertEqual(f.seek(-1, 2), self.LARGE)
319 self.assertEqual(f.read(2), b"x")
320
Antoine Pitrou19690592009-06-12 20:14:08 +0000321 def test_invalid_operations(self):
322 # Try writing on a file opened in read mode and vice-versa.
323 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000324 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000325 self.assertRaises(IOError, fp.read)
326 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000327 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000328 self.assertRaises(IOError, fp.write, b"blah")
329 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000330 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000331 self.assertRaises(IOError, fp.write, "blah")
332 self.assertRaises(IOError, fp.writelines, ["blah\n"])
333
Christian Heimes1a6387e2008-03-26 12:49:49 +0000334 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000335 with self.open(support.TESTFN, "wb", buffering=0) as f:
336 self.assertEqual(f.readable(), False)
337 self.assertEqual(f.writable(), True)
338 self.assertEqual(f.seekable(), True)
339 self.write_ops(f)
340 with self.open(support.TESTFN, "rb", buffering=0) as f:
341 self.assertEqual(f.readable(), True)
342 self.assertEqual(f.writable(), False)
343 self.assertEqual(f.seekable(), True)
344 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000345
346 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000347 with self.open(support.TESTFN, "wb") as f:
348 self.assertEqual(f.readable(), False)
349 self.assertEqual(f.writable(), True)
350 self.assertEqual(f.seekable(), True)
351 self.write_ops(f)
352 with self.open(support.TESTFN, "rb") as f:
353 self.assertEqual(f.readable(), True)
354 self.assertEqual(f.writable(), False)
355 self.assertEqual(f.seekable(), True)
356 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000357
358 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000359 with self.open(support.TESTFN, "wb") as f:
360 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
361 with self.open(support.TESTFN, "rb") as f:
362 self.assertEqual(f.readline(), b"abc\n")
363 self.assertEqual(f.readline(10), b"def\n")
364 self.assertEqual(f.readline(2), b"xy")
365 self.assertEqual(f.readline(4), b"zzy\n")
366 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000367 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000368 self.assertRaises(TypeError, f.readline, 5.3)
369 with self.open(support.TESTFN, "r") as f:
370 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000371
372 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000373 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000374 self.write_ops(f)
375 data = f.getvalue()
376 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000377 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000378 self.read_ops(f, True)
379
380 def test_large_file_ops(self):
381 # On Windows and Mac OSX this test comsumes large resources; It takes
382 # a long time to build the >2GB file and takes >2GB of disk space
383 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000384 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
385 if not support.is_resource_enabled("largefile"):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000386 print("\nTesting large file ops skipped on %s." % sys.platform,
387 file=sys.stderr)
388 print("It requires %d bytes and a long time." % self.LARGE,
389 file=sys.stderr)
390 print("Use 'regrtest.py -u largefile test_io' to run it.",
391 file=sys.stderr)
392 return
Antoine Pitrou19690592009-06-12 20:14:08 +0000393 with self.open(support.TESTFN, "w+b", 0) as f:
394 self.large_file_ops(f)
395 with self.open(support.TESTFN, "w+b") as f:
396 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000397
398 def test_with_open(self):
399 for bufsize in (0, 1, 100):
400 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000401 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000402 f.write(b"xxx")
403 self.assertEqual(f.closed, True)
404 f = None
405 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000406 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000407 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000408 except ZeroDivisionError:
409 self.assertEqual(f.closed, True)
410 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000411 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000412
Antoine Pitroue741cc62009-01-21 00:45:36 +0000413 # issue 5008
414 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000415 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000416 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000417 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000418 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000419 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000420 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000421 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000422 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000423
Christian Heimes1a6387e2008-03-26 12:49:49 +0000424 def test_destructor(self):
425 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000426 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000427 def __del__(self):
428 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000429 try:
430 f = super(MyFileIO, self).__del__
431 except AttributeError:
432 pass
433 else:
434 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000435 def close(self):
436 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000437 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000438 def flush(self):
439 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000440 super(MyFileIO, self).flush()
441 f = MyFileIO(support.TESTFN, "wb")
442 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000443 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000444 support.gc_collect()
445 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000446 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000447 self.assertEqual(f.read(), b"xxx")
448
449 def _check_base_destructor(self, base):
450 record = []
451 class MyIO(base):
452 def __init__(self):
453 # This exercises the availability of attributes on object
454 # destruction.
455 # (in the C version, close() is called by the tp_dealloc
456 # function, not by __del__)
457 self.on_del = 1
458 self.on_close = 2
459 self.on_flush = 3
460 def __del__(self):
461 record.append(self.on_del)
462 try:
463 f = super(MyIO, self).__del__
464 except AttributeError:
465 pass
466 else:
467 f()
468 def close(self):
469 record.append(self.on_close)
470 super(MyIO, self).close()
471 def flush(self):
472 record.append(self.on_flush)
473 super(MyIO, self).flush()
474 f = MyIO()
475 del f
476 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000477 self.assertEqual(record, [1, 2, 3])
478
Antoine Pitrou19690592009-06-12 20:14:08 +0000479 def test_IOBase_destructor(self):
480 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000481
Antoine Pitrou19690592009-06-12 20:14:08 +0000482 def test_RawIOBase_destructor(self):
483 self._check_base_destructor(self.RawIOBase)
484
485 def test_BufferedIOBase_destructor(self):
486 self._check_base_destructor(self.BufferedIOBase)
487
488 def test_TextIOBase_destructor(self):
489 self._check_base_destructor(self.TextIOBase)
490
491 def test_close_flushes(self):
492 with self.open(support.TESTFN, "wb") as f:
493 f.write(b"xxx")
494 with self.open(support.TESTFN, "rb") as f:
495 self.assertEqual(f.read(), b"xxx")
496
497 def test_array_writes(self):
498 a = array.array(b'i', range(10))
499 n = len(a.tostring())
500 with self.open(support.TESTFN, "wb", 0) as f:
501 self.assertEqual(f.write(a), n)
502 with self.open(support.TESTFN, "wb") as f:
503 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000504
505 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000506 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000507 closefd=False)
508
Antoine Pitrou19690592009-06-12 20:14:08 +0000509 def test_read_closed(self):
510 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000511 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000512 with self.open(support.TESTFN, "r") as f:
513 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000514 self.assertEqual(file.read(), "egg\n")
515 file.seek(0)
516 file.close()
517 self.assertRaises(ValueError, file.read)
518
519 def test_no_closefd_with_filename(self):
520 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000521 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000522
523 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000524 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000525 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000526 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000527 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000528 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000529 self.assertEqual(file.buffer.raw.closefd, False)
530
Antoine Pitrou19690592009-06-12 20:14:08 +0000531 def test_garbage_collection(self):
532 # FileIO objects are collected, and collecting them flushes
533 # all data to disk.
534 f = self.FileIO(support.TESTFN, "wb")
535 f.write(b"abcxxx")
536 f.f = f
537 wr = weakref.ref(f)
538 del f
539 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000540 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000541 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000542 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000543
Antoine Pitrou19690592009-06-12 20:14:08 +0000544 def test_unbounded_file(self):
545 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
546 zero = "/dev/zero"
547 if not os.path.exists(zero):
548 self.skipTest("{0} does not exist".format(zero))
549 if sys.maxsize > 0x7FFFFFFF:
550 self.skipTest("test can only run in a 32-bit address space")
551 if support.real_max_memuse < support._2G:
552 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000553 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000554 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000555 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000556 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000557 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000558 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000559
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000560 def test_flush_error_on_close(self):
561 f = self.open(support.TESTFN, "wb", buffering=0)
562 def bad_flush():
563 raise IOError()
564 f.flush = bad_flush
565 self.assertRaises(IOError, f.close) # exception not swallowed
566
567 def test_multi_close(self):
568 f = self.open(support.TESTFN, "wb", buffering=0)
569 f.close()
570 f.close()
571 f.close()
572 self.assertRaises(ValueError, f.flush)
573
Antoine Pitrou6391b342010-09-14 18:48:19 +0000574 def test_RawIOBase_read(self):
575 # Exercise the default RawIOBase.read() implementation (which calls
576 # readinto() internally).
577 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
578 self.assertEqual(rawio.read(2), b"ab")
579 self.assertEqual(rawio.read(2), b"c")
580 self.assertEqual(rawio.read(2), b"d")
581 self.assertEqual(rawio.read(2), None)
582 self.assertEqual(rawio.read(2), b"ef")
583 self.assertEqual(rawio.read(2), b"g")
584 self.assertEqual(rawio.read(2), None)
585 self.assertEqual(rawio.read(2), b"")
586
Antoine Pitrou19690592009-06-12 20:14:08 +0000587class CIOTest(IOTest):
588 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000589
Antoine Pitrou19690592009-06-12 20:14:08 +0000590class PyIOTest(IOTest):
591 test_array_writes = unittest.skip(
592 "len(array.array) returns number of elements rather than bytelength"
593 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000594
595
Antoine Pitrou19690592009-06-12 20:14:08 +0000596class CommonBufferedTests:
597 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
598
599 def test_detach(self):
600 raw = self.MockRawIO()
601 buf = self.tp(raw)
602 self.assertIs(buf.detach(), raw)
603 self.assertRaises(ValueError, buf.detach)
604
605 def test_fileno(self):
606 rawio = self.MockRawIO()
607 bufio = self.tp(rawio)
608
Ezio Melotti2623a372010-11-21 13:34:58 +0000609 self.assertEqual(42, bufio.fileno())
Antoine Pitrou19690592009-06-12 20:14:08 +0000610
611 def test_no_fileno(self):
612 # XXX will we always have fileno() function? If so, kill
613 # this test. Else, write it.
614 pass
615
616 def test_invalid_args(self):
617 rawio = self.MockRawIO()
618 bufio = self.tp(rawio)
619 # Invalid whence
620 self.assertRaises(ValueError, bufio.seek, 0, -1)
621 self.assertRaises(ValueError, bufio.seek, 0, 3)
622
623 def test_override_destructor(self):
624 tp = self.tp
625 record = []
626 class MyBufferedIO(tp):
627 def __del__(self):
628 record.append(1)
629 try:
630 f = super(MyBufferedIO, self).__del__
631 except AttributeError:
632 pass
633 else:
634 f()
635 def close(self):
636 record.append(2)
637 super(MyBufferedIO, self).close()
638 def flush(self):
639 record.append(3)
640 super(MyBufferedIO, self).flush()
641 rawio = self.MockRawIO()
642 bufio = MyBufferedIO(rawio)
643 writable = bufio.writable()
644 del bufio
645 support.gc_collect()
646 if writable:
647 self.assertEqual(record, [1, 2, 3])
648 else:
649 self.assertEqual(record, [1, 2])
650
651 def test_context_manager(self):
652 # Test usability as a context manager
653 rawio = self.MockRawIO()
654 bufio = self.tp(rawio)
655 def _with():
656 with bufio:
657 pass
658 _with()
659 # bufio should now be closed, and using it a second time should raise
660 # a ValueError.
661 self.assertRaises(ValueError, _with)
662
663 def test_error_through_destructor(self):
664 # Test that the exception state is not modified by a destructor,
665 # even if close() fails.
666 rawio = self.CloseFailureIO()
667 def f():
668 self.tp(rawio).xyzzy
669 with support.captured_output("stderr") as s:
670 self.assertRaises(AttributeError, f)
671 s = s.getvalue().strip()
672 if s:
673 # The destructor *may* have printed an unraisable error, check it
674 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000675 self.assertTrue(s.startswith("Exception IOError: "), s)
676 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000677
678 def test_repr(self):
679 raw = self.MockRawIO()
680 b = self.tp(raw)
681 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
682 self.assertEqual(repr(b), "<%s>" % clsname)
683 raw.name = "dummy"
684 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
685 raw.name = b"dummy"
686 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000687
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000688 def test_flush_error_on_close(self):
689 raw = self.MockRawIO()
690 def bad_flush():
691 raise IOError()
692 raw.flush = bad_flush
693 b = self.tp(raw)
694 self.assertRaises(IOError, b.close) # exception not swallowed
695
696 def test_multi_close(self):
697 raw = self.MockRawIO()
698 b = self.tp(raw)
699 b.close()
700 b.close()
701 b.close()
702 self.assertRaises(ValueError, b.flush)
703
Antoine Pitroufc9ead62010-12-21 21:26:55 +0000704 def test_readonly_attributes(self):
705 raw = self.MockRawIO()
706 buf = self.tp(raw)
707 x = self.MockRawIO()
708 with self.assertRaises((AttributeError, TypeError)):
709 buf.raw = x
710
Christian Heimes1a6387e2008-03-26 12:49:49 +0000711
Antoine Pitrou19690592009-06-12 20:14:08 +0000712class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
713 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000714
Antoine Pitrou19690592009-06-12 20:14:08 +0000715 def test_constructor(self):
716 rawio = self.MockRawIO([b"abc"])
717 bufio = self.tp(rawio)
718 bufio.__init__(rawio)
719 bufio.__init__(rawio, buffer_size=1024)
720 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000721 self.assertEqual(b"abc", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000722 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
723 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
724 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
725 rawio = self.MockRawIO([b"abc"])
726 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000727 self.assertEqual(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000728
Antoine Pitrou19690592009-06-12 20:14:08 +0000729 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000730 for arg in (None, 7):
731 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
732 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000733 self.assertEqual(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000734 # Invalid args
735 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000736
Antoine Pitrou19690592009-06-12 20:14:08 +0000737 def test_read1(self):
738 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
739 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000740 self.assertEqual(b"a", bufio.read(1))
741 self.assertEqual(b"b", bufio.read1(1))
742 self.assertEqual(rawio._reads, 1)
743 self.assertEqual(b"c", bufio.read1(100))
744 self.assertEqual(rawio._reads, 1)
745 self.assertEqual(b"d", bufio.read1(100))
746 self.assertEqual(rawio._reads, 2)
747 self.assertEqual(b"efg", bufio.read1(100))
748 self.assertEqual(rawio._reads, 3)
749 self.assertEqual(b"", bufio.read1(100))
750 self.assertEqual(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000751 # Invalid args
752 self.assertRaises(ValueError, bufio.read1, -1)
753
754 def test_readinto(self):
755 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
756 bufio = self.tp(rawio)
757 b = bytearray(2)
Ezio Melotti2623a372010-11-21 13:34:58 +0000758 self.assertEqual(bufio.readinto(b), 2)
759 self.assertEqual(b, b"ab")
760 self.assertEqual(bufio.readinto(b), 2)
761 self.assertEqual(b, b"cd")
762 self.assertEqual(bufio.readinto(b), 2)
763 self.assertEqual(b, b"ef")
764 self.assertEqual(bufio.readinto(b), 1)
765 self.assertEqual(b, b"gf")
766 self.assertEqual(bufio.readinto(b), 0)
767 self.assertEqual(b, b"gf")
Antoine Pitrou19690592009-06-12 20:14:08 +0000768
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000769 def test_readlines(self):
770 def bufio():
771 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
772 return self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000773 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
774 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
775 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000776
Antoine Pitrou19690592009-06-12 20:14:08 +0000777 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000778 data = b"abcdefghi"
779 dlen = len(data)
780
781 tests = [
782 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
783 [ 100, [ 3, 3, 3], [ dlen ] ],
784 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
785 ]
786
787 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000788 rawio = self.MockFileIO(data)
789 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000790 pos = 0
791 for nbytes in buf_read_sizes:
Ezio Melotti2623a372010-11-21 13:34:58 +0000792 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Christian Heimes1a6387e2008-03-26 12:49:49 +0000793 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000794 # this is mildly implementation-dependent
Ezio Melotti2623a372010-11-21 13:34:58 +0000795 self.assertEqual(rawio.read_history, raw_read_sizes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000796
Antoine Pitrou19690592009-06-12 20:14:08 +0000797 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000798 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000799 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
800 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000801
Ezio Melotti2623a372010-11-21 13:34:58 +0000802 self.assertEqual(b"abcd", bufio.read(6))
803 self.assertEqual(b"e", bufio.read(1))
804 self.assertEqual(b"fg", bufio.read())
805 self.assertEqual(b"", bufio.peek(1))
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000806 self.assertTrue(None is bufio.read())
Ezio Melotti2623a372010-11-21 13:34:58 +0000807 self.assertEqual(b"", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000808
Antoine Pitrou19690592009-06-12 20:14:08 +0000809 def test_read_past_eof(self):
810 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
811 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000812
Ezio Melotti2623a372010-11-21 13:34:58 +0000813 self.assertEqual(b"abcdefg", bufio.read(9000))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000814
Antoine Pitrou19690592009-06-12 20:14:08 +0000815 def test_read_all(self):
816 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
817 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000818
Ezio Melotti2623a372010-11-21 13:34:58 +0000819 self.assertEqual(b"abcdefg", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000820
Victor Stinner6a102812010-04-27 23:55:59 +0000821 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +0000822 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +0000823 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000824 try:
825 # Write out many bytes with exactly the same number of 0's,
826 # 1's... 255's. This will help us check that concurrent reading
827 # doesn't duplicate or forget contents.
828 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000829 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000830 random.shuffle(l)
831 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000832 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000833 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000834 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000835 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000836 errors = []
837 results = []
838 def f():
839 try:
840 # Intra-buffer read then buffer-flushing read
841 for n in cycle([1, 19]):
842 s = bufio.read(n)
843 if not s:
844 break
845 # list.append() is atomic
846 results.append(s)
847 except Exception as e:
848 errors.append(e)
849 raise
850 threads = [threading.Thread(target=f) for x in range(20)]
851 for t in threads:
852 t.start()
853 time.sleep(0.02) # yield
854 for t in threads:
855 t.join()
856 self.assertFalse(errors,
857 "the following exceptions were caught: %r" % errors)
858 s = b''.join(results)
859 for i in range(256):
860 c = bytes(bytearray([i]))
861 self.assertEqual(s.count(c), N)
862 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000863 support.unlink(support.TESTFN)
864
865 def test_misbehaved_io(self):
866 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
867 bufio = self.tp(rawio)
868 self.assertRaises(IOError, bufio.seek, 0)
869 self.assertRaises(IOError, bufio.tell)
870
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000871 def test_no_extraneous_read(self):
872 # Issue #9550; when the raw IO object has satisfied the read request,
873 # we should not issue any additional reads, otherwise it may block
874 # (e.g. socket).
875 bufsize = 16
876 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
877 rawio = self.MockRawIO([b"x" * n])
878 bufio = self.tp(rawio, bufsize)
879 self.assertEqual(bufio.read(n), b"x" * n)
880 # Simple case: one raw read is enough to satisfy the request.
881 self.assertEqual(rawio._extraneous_reads, 0,
882 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
883 # A more complex case where two raw reads are needed to satisfy
884 # the request.
885 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
886 bufio = self.tp(rawio, bufsize)
887 self.assertEqual(bufio.read(n), b"x" * n)
888 self.assertEqual(rawio._extraneous_reads, 0,
889 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
890
891
Antoine Pitrou19690592009-06-12 20:14:08 +0000892class CBufferedReaderTest(BufferedReaderTest):
893 tp = io.BufferedReader
894
895 def test_constructor(self):
896 BufferedReaderTest.test_constructor(self)
897 # The allocation can succeed on 32-bit builds, e.g. with more
898 # than 2GB RAM and a 64-bit kernel.
899 if sys.maxsize > 0x7FFFFFFF:
900 rawio = self.MockRawIO()
901 bufio = self.tp(rawio)
902 self.assertRaises((OverflowError, MemoryError, ValueError),
903 bufio.__init__, rawio, sys.maxsize)
904
905 def test_initialization(self):
906 rawio = self.MockRawIO([b"abc"])
907 bufio = self.tp(rawio)
908 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
909 self.assertRaises(ValueError, bufio.read)
910 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
911 self.assertRaises(ValueError, bufio.read)
912 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
913 self.assertRaises(ValueError, bufio.read)
914
915 def test_misbehaved_io_read(self):
916 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
917 bufio = self.tp(rawio)
918 # _pyio.BufferedReader seems to implement reading different, so that
919 # checking this is not so easy.
920 self.assertRaises(IOError, bufio.read, 10)
921
922 def test_garbage_collection(self):
923 # C BufferedReader objects are collected.
924 # The Python version has __del__, so it ends into gc.garbage instead
925 rawio = self.FileIO(support.TESTFN, "w+b")
926 f = self.tp(rawio)
927 f.f = f
928 wr = weakref.ref(f)
929 del f
930 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000931 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +0000932
933class PyBufferedReaderTest(BufferedReaderTest):
934 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000935
936
Antoine Pitrou19690592009-06-12 20:14:08 +0000937class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
938 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000939
Antoine Pitrou19690592009-06-12 20:14:08 +0000940 def test_constructor(self):
941 rawio = self.MockRawIO()
942 bufio = self.tp(rawio)
943 bufio.__init__(rawio)
944 bufio.__init__(rawio, buffer_size=1024)
945 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000946 self.assertEqual(3, bufio.write(b"abc"))
Antoine Pitrou19690592009-06-12 20:14:08 +0000947 bufio.flush()
948 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
949 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
950 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
951 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000952 self.assertEqual(3, bufio.write(b"ghi"))
Antoine Pitrou19690592009-06-12 20:14:08 +0000953 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +0000954 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000955
Antoine Pitrou19690592009-06-12 20:14:08 +0000956 def test_detach_flush(self):
957 raw = self.MockRawIO()
958 buf = self.tp(raw)
959 buf.write(b"howdy!")
960 self.assertFalse(raw._write_stack)
961 buf.detach()
962 self.assertEqual(raw._write_stack, [b"howdy!"])
963
964 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000965 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +0000966 writer = self.MockRawIO()
967 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000968 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000969 self.assertFalse(writer._write_stack)
970
Antoine Pitrou19690592009-06-12 20:14:08 +0000971 def test_write_overflow(self):
972 writer = self.MockRawIO()
973 bufio = self.tp(writer, 8)
974 contents = b"abcdefghijklmnop"
975 for n in range(0, len(contents), 3):
976 bufio.write(contents[n:n+3])
977 flushed = b"".join(writer._write_stack)
978 # At least (total - 8) bytes were implicitly flushed, perhaps more
979 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000980 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000981
Antoine Pitrou19690592009-06-12 20:14:08 +0000982 def check_writes(self, intermediate_func):
983 # Lots of writes, test the flushed output is as expected.
984 contents = bytes(range(256)) * 1000
985 n = 0
986 writer = self.MockRawIO()
987 bufio = self.tp(writer, 13)
988 # Generator of write sizes: repeat each N 15 times then proceed to N+1
989 def gen_sizes():
990 for size in count(1):
991 for i in range(15):
992 yield size
993 sizes = gen_sizes()
994 while n < len(contents):
995 size = min(next(sizes), len(contents) - n)
Ezio Melotti2623a372010-11-21 13:34:58 +0000996 self.assertEqual(bufio.write(contents[n:n+size]), size)
Antoine Pitrou19690592009-06-12 20:14:08 +0000997 intermediate_func(bufio)
998 n += size
999 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001000 self.assertEqual(contents,
Antoine Pitrou19690592009-06-12 20:14:08 +00001001 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001002
Antoine Pitrou19690592009-06-12 20:14:08 +00001003 def test_writes(self):
1004 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001005
Antoine Pitrou19690592009-06-12 20:14:08 +00001006 def test_writes_and_flushes(self):
1007 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001008
Antoine Pitrou19690592009-06-12 20:14:08 +00001009 def test_writes_and_seeks(self):
1010 def _seekabs(bufio):
1011 pos = bufio.tell()
1012 bufio.seek(pos + 1, 0)
1013 bufio.seek(pos - 1, 0)
1014 bufio.seek(pos, 0)
1015 self.check_writes(_seekabs)
1016 def _seekrel(bufio):
1017 pos = bufio.seek(0, 1)
1018 bufio.seek(+1, 1)
1019 bufio.seek(-1, 1)
1020 bufio.seek(pos, 0)
1021 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001022
Antoine Pitrou19690592009-06-12 20:14:08 +00001023 def test_writes_and_truncates(self):
1024 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001025
Antoine Pitrou19690592009-06-12 20:14:08 +00001026 def test_write_non_blocking(self):
1027 raw = self.MockNonBlockWriterIO()
1028 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001029
Ezio Melotti2623a372010-11-21 13:34:58 +00001030 self.assertEqual(bufio.write(b"abcd"), 4)
1031 self.assertEqual(bufio.write(b"efghi"), 5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001032 # 1 byte will be written, the rest will be buffered
1033 raw.block_on(b"k")
Ezio Melotti2623a372010-11-21 13:34:58 +00001034 self.assertEqual(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001035
Antoine Pitrou19690592009-06-12 20:14:08 +00001036 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1037 raw.block_on(b"0")
1038 try:
1039 bufio.write(b"opqrwxyz0123456789")
1040 except self.BlockingIOError as e:
1041 written = e.characters_written
1042 else:
1043 self.fail("BlockingIOError should have been raised")
Ezio Melotti2623a372010-11-21 13:34:58 +00001044 self.assertEqual(written, 16)
1045 self.assertEqual(raw.pop_written(),
Antoine Pitrou19690592009-06-12 20:14:08 +00001046 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001047
Ezio Melotti2623a372010-11-21 13:34:58 +00001048 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Antoine Pitrou19690592009-06-12 20:14:08 +00001049 s = raw.pop_written()
1050 # Previously buffered bytes were flushed
1051 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001052
Antoine Pitrou19690592009-06-12 20:14:08 +00001053 def test_write_and_rewind(self):
1054 raw = io.BytesIO()
1055 bufio = self.tp(raw, 4)
1056 self.assertEqual(bufio.write(b"abcdef"), 6)
1057 self.assertEqual(bufio.tell(), 6)
1058 bufio.seek(0, 0)
1059 self.assertEqual(bufio.write(b"XY"), 2)
1060 bufio.seek(6, 0)
1061 self.assertEqual(raw.getvalue(), b"XYcdef")
1062 self.assertEqual(bufio.write(b"123456"), 6)
1063 bufio.flush()
1064 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001065
Antoine Pitrou19690592009-06-12 20:14:08 +00001066 def test_flush(self):
1067 writer = self.MockRawIO()
1068 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001069 bufio.write(b"abc")
1070 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001071 self.assertEqual(b"abc", writer._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001072
Antoine Pitrou19690592009-06-12 20:14:08 +00001073 def test_destructor(self):
1074 writer = self.MockRawIO()
1075 bufio = self.tp(writer, 8)
1076 bufio.write(b"abc")
1077 del bufio
1078 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001079 self.assertEqual(b"abc", writer._write_stack[0])
Antoine Pitrou19690592009-06-12 20:14:08 +00001080
1081 def test_truncate(self):
1082 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001083 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001084 bufio = self.tp(raw, 8)
1085 bufio.write(b"abcdef")
1086 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001087 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001088 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001089 self.assertEqual(f.read(), b"abc")
1090
Victor Stinner6a102812010-04-27 23:55:59 +00001091 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001092 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001093 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001094 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001095 # Write out many bytes from many threads and test they were
1096 # all flushed.
1097 N = 1000
1098 contents = bytes(range(256)) * N
1099 sizes = cycle([1, 19])
1100 n = 0
1101 queue = deque()
1102 while n < len(contents):
1103 size = next(sizes)
1104 queue.append(contents[n:n+size])
1105 n += size
1106 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001107 # We use a real file object because it allows us to
1108 # exercise situations where the GIL is released before
1109 # writing the buffer to the raw streams. This is in addition
1110 # to concurrency issues due to switching threads in the middle
1111 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001112 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001113 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001114 errors = []
1115 def f():
1116 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001117 while True:
1118 try:
1119 s = queue.popleft()
1120 except IndexError:
1121 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001122 bufio.write(s)
1123 except Exception as e:
1124 errors.append(e)
1125 raise
1126 threads = [threading.Thread(target=f) for x in range(20)]
1127 for t in threads:
1128 t.start()
1129 time.sleep(0.02) # yield
1130 for t in threads:
1131 t.join()
1132 self.assertFalse(errors,
1133 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001134 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001135 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001136 s = f.read()
1137 for i in range(256):
Ezio Melotti2623a372010-11-21 13:34:58 +00001138 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001139 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001140 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001141
Antoine Pitrou19690592009-06-12 20:14:08 +00001142 def test_misbehaved_io(self):
1143 rawio = self.MisbehavedRawIO()
1144 bufio = self.tp(rawio, 5)
1145 self.assertRaises(IOError, bufio.seek, 0)
1146 self.assertRaises(IOError, bufio.tell)
1147 self.assertRaises(IOError, bufio.write, b"abcdef")
1148
1149 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001150 with support.check_warnings(("max_buffer_size is deprecated",
1151 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001152 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001153
1154
1155class CBufferedWriterTest(BufferedWriterTest):
1156 tp = io.BufferedWriter
1157
1158 def test_constructor(self):
1159 BufferedWriterTest.test_constructor(self)
1160 # The allocation can succeed on 32-bit builds, e.g. with more
1161 # than 2GB RAM and a 64-bit kernel.
1162 if sys.maxsize > 0x7FFFFFFF:
1163 rawio = self.MockRawIO()
1164 bufio = self.tp(rawio)
1165 self.assertRaises((OverflowError, MemoryError, ValueError),
1166 bufio.__init__, rawio, sys.maxsize)
1167
1168 def test_initialization(self):
1169 rawio = self.MockRawIO()
1170 bufio = self.tp(rawio)
1171 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1172 self.assertRaises(ValueError, bufio.write, b"def")
1173 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1174 self.assertRaises(ValueError, bufio.write, b"def")
1175 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1176 self.assertRaises(ValueError, bufio.write, b"def")
1177
1178 def test_garbage_collection(self):
1179 # C BufferedWriter objects are collected, and collecting them flushes
1180 # all data to disk.
1181 # The Python version has __del__, so it ends into gc.garbage instead
1182 rawio = self.FileIO(support.TESTFN, "w+b")
1183 f = self.tp(rawio)
1184 f.write(b"123xxx")
1185 f.x = f
1186 wr = weakref.ref(f)
1187 del f
1188 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001189 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001190 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001191 self.assertEqual(f.read(), b"123xxx")
1192
1193
1194class PyBufferedWriterTest(BufferedWriterTest):
1195 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001196
1197class BufferedRWPairTest(unittest.TestCase):
1198
Antoine Pitrou19690592009-06-12 20:14:08 +00001199 def test_constructor(self):
1200 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001201 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001202
Antoine Pitrou19690592009-06-12 20:14:08 +00001203 def test_detach(self):
1204 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1205 self.assertRaises(self.UnsupportedOperation, pair.detach)
1206
1207 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001208 with support.check_warnings(("max_buffer_size is deprecated",
1209 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001210 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001211
1212 def test_constructor_with_not_readable(self):
1213 class NotReadable(MockRawIO):
1214 def readable(self):
1215 return False
1216
1217 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1218
1219 def test_constructor_with_not_writeable(self):
1220 class NotWriteable(MockRawIO):
1221 def writable(self):
1222 return False
1223
1224 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1225
1226 def test_read(self):
1227 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1228
1229 self.assertEqual(pair.read(3), b"abc")
1230 self.assertEqual(pair.read(1), b"d")
1231 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001232 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1233 self.assertEqual(pair.read(None), b"abc")
1234
1235 def test_readlines(self):
1236 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1237 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1238 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1239 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001240
1241 def test_read1(self):
1242 # .read1() is delegated to the underlying reader object, so this test
1243 # can be shallow.
1244 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1245
1246 self.assertEqual(pair.read1(3), b"abc")
1247
1248 def test_readinto(self):
1249 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1250
1251 data = bytearray(5)
1252 self.assertEqual(pair.readinto(data), 5)
1253 self.assertEqual(data, b"abcde")
1254
1255 def test_write(self):
1256 w = self.MockRawIO()
1257 pair = self.tp(self.MockRawIO(), w)
1258
1259 pair.write(b"abc")
1260 pair.flush()
1261 pair.write(b"def")
1262 pair.flush()
1263 self.assertEqual(w._write_stack, [b"abc", b"def"])
1264
1265 def test_peek(self):
1266 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1267
1268 self.assertTrue(pair.peek(3).startswith(b"abc"))
1269 self.assertEqual(pair.read(3), b"abc")
1270
1271 def test_readable(self):
1272 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1273 self.assertTrue(pair.readable())
1274
1275 def test_writeable(self):
1276 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1277 self.assertTrue(pair.writable())
1278
1279 def test_seekable(self):
1280 # BufferedRWPairs are never seekable, even if their readers and writers
1281 # are.
1282 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1283 self.assertFalse(pair.seekable())
1284
1285 # .flush() is delegated to the underlying writer object and has been
1286 # tested in the test_write method.
1287
1288 def test_close_and_closed(self):
1289 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1290 self.assertFalse(pair.closed)
1291 pair.close()
1292 self.assertTrue(pair.closed)
1293
1294 def test_isatty(self):
1295 class SelectableIsAtty(MockRawIO):
1296 def __init__(self, isatty):
1297 MockRawIO.__init__(self)
1298 self._isatty = isatty
1299
1300 def isatty(self):
1301 return self._isatty
1302
1303 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1304 self.assertFalse(pair.isatty())
1305
1306 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1307 self.assertTrue(pair.isatty())
1308
1309 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1310 self.assertTrue(pair.isatty())
1311
1312 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1313 self.assertTrue(pair.isatty())
1314
1315class CBufferedRWPairTest(BufferedRWPairTest):
1316 tp = io.BufferedRWPair
1317
1318class PyBufferedRWPairTest(BufferedRWPairTest):
1319 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001320
1321
Antoine Pitrou19690592009-06-12 20:14:08 +00001322class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1323 read_mode = "rb+"
1324 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001325
Antoine Pitrou19690592009-06-12 20:14:08 +00001326 def test_constructor(self):
1327 BufferedReaderTest.test_constructor(self)
1328 BufferedWriterTest.test_constructor(self)
1329
1330 def test_read_and_write(self):
1331 raw = self.MockRawIO((b"asdf", b"ghjk"))
1332 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001333
1334 self.assertEqual(b"as", rw.read(2))
1335 rw.write(b"ddd")
1336 rw.write(b"eee")
1337 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001338 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001339 self.assertEqual(b"dddeee", raw._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001340
Antoine Pitrou19690592009-06-12 20:14:08 +00001341 def test_seek_and_tell(self):
1342 raw = self.BytesIO(b"asdfghjkl")
1343 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001344
Ezio Melotti2623a372010-11-21 13:34:58 +00001345 self.assertEqual(b"as", rw.read(2))
1346 self.assertEqual(2, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001347 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001348 self.assertEqual(b"asdf", rw.read(4))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001349
1350 rw.write(b"asdf")
1351 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001352 self.assertEqual(b"asdfasdfl", rw.read())
1353 self.assertEqual(9, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001354 rw.seek(-4, 2)
Ezio Melotti2623a372010-11-21 13:34:58 +00001355 self.assertEqual(5, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001356 rw.seek(2, 1)
Ezio Melotti2623a372010-11-21 13:34:58 +00001357 self.assertEqual(7, rw.tell())
1358 self.assertEqual(b"fl", rw.read(11))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001359 self.assertRaises(TypeError, rw.seek, 0.0)
1360
Antoine Pitrou19690592009-06-12 20:14:08 +00001361 def check_flush_and_read(self, read_func):
1362 raw = self.BytesIO(b"abcdefghi")
1363 bufio = self.tp(raw)
1364
Ezio Melotti2623a372010-11-21 13:34:58 +00001365 self.assertEqual(b"ab", read_func(bufio, 2))
Antoine Pitrou19690592009-06-12 20:14:08 +00001366 bufio.write(b"12")
Ezio Melotti2623a372010-11-21 13:34:58 +00001367 self.assertEqual(b"ef", read_func(bufio, 2))
1368 self.assertEqual(6, bufio.tell())
Antoine Pitrou19690592009-06-12 20:14:08 +00001369 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001370 self.assertEqual(6, bufio.tell())
1371 self.assertEqual(b"ghi", read_func(bufio))
Antoine Pitrou19690592009-06-12 20:14:08 +00001372 raw.seek(0, 0)
1373 raw.write(b"XYZ")
1374 # flush() resets the read buffer
1375 bufio.flush()
1376 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001377 self.assertEqual(b"XYZ", read_func(bufio, 3))
Antoine Pitrou19690592009-06-12 20:14:08 +00001378
1379 def test_flush_and_read(self):
1380 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1381
1382 def test_flush_and_readinto(self):
1383 def _readinto(bufio, n=-1):
1384 b = bytearray(n if n >= 0 else 9999)
1385 n = bufio.readinto(b)
1386 return bytes(b[:n])
1387 self.check_flush_and_read(_readinto)
1388
1389 def test_flush_and_peek(self):
1390 def _peek(bufio, n=-1):
1391 # This relies on the fact that the buffer can contain the whole
1392 # raw stream, otherwise peek() can return less.
1393 b = bufio.peek(n)
1394 if n != -1:
1395 b = b[:n]
1396 bufio.seek(len(b), 1)
1397 return b
1398 self.check_flush_and_read(_peek)
1399
1400 def test_flush_and_write(self):
1401 raw = self.BytesIO(b"abcdefghi")
1402 bufio = self.tp(raw)
1403
1404 bufio.write(b"123")
1405 bufio.flush()
1406 bufio.write(b"45")
1407 bufio.flush()
1408 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001409 self.assertEqual(b"12345fghi", raw.getvalue())
1410 self.assertEqual(b"12345fghi", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +00001411
1412 def test_threads(self):
1413 BufferedReaderTest.test_threads(self)
1414 BufferedWriterTest.test_threads(self)
1415
1416 def test_writes_and_peek(self):
1417 def _peek(bufio):
1418 bufio.peek(1)
1419 self.check_writes(_peek)
1420 def _peek(bufio):
1421 pos = bufio.tell()
1422 bufio.seek(-1, 1)
1423 bufio.peek(1)
1424 bufio.seek(pos, 0)
1425 self.check_writes(_peek)
1426
1427 def test_writes_and_reads(self):
1428 def _read(bufio):
1429 bufio.seek(-1, 1)
1430 bufio.read(1)
1431 self.check_writes(_read)
1432
1433 def test_writes_and_read1s(self):
1434 def _read1(bufio):
1435 bufio.seek(-1, 1)
1436 bufio.read1(1)
1437 self.check_writes(_read1)
1438
1439 def test_writes_and_readintos(self):
1440 def _read(bufio):
1441 bufio.seek(-1, 1)
1442 bufio.readinto(bytearray(1))
1443 self.check_writes(_read)
1444
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001445 def test_write_after_readahead(self):
1446 # Issue #6629: writing after the buffer was filled by readahead should
1447 # first rewind the raw stream.
1448 for overwrite_size in [1, 5]:
1449 raw = self.BytesIO(b"A" * 10)
1450 bufio = self.tp(raw, 4)
1451 # Trigger readahead
1452 self.assertEqual(bufio.read(1), b"A")
1453 self.assertEqual(bufio.tell(), 1)
1454 # Overwriting should rewind the raw stream if it needs so
1455 bufio.write(b"B" * overwrite_size)
1456 self.assertEqual(bufio.tell(), overwrite_size + 1)
1457 # If the write size was smaller than the buffer size, flush() and
1458 # check that rewind happens.
1459 bufio.flush()
1460 self.assertEqual(bufio.tell(), overwrite_size + 1)
1461 s = raw.getvalue()
1462 self.assertEqual(s,
1463 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1464
Antoine Pitrouee46a7b2011-05-13 00:31:52 +02001465 def test_write_rewind_write(self):
1466 # Various combinations of reading / writing / seeking backwards / writing again
1467 def mutate(bufio, pos1, pos2):
1468 assert pos2 >= pos1
1469 # Fill the buffer
1470 bufio.seek(pos1)
1471 bufio.read(pos2 - pos1)
1472 bufio.write(b'\x02')
1473 # This writes earlier than the previous write, but still inside
1474 # the buffer.
1475 bufio.seek(pos1)
1476 bufio.write(b'\x01')
1477
1478 b = b"\x80\x81\x82\x83\x84"
1479 for i in range(0, len(b)):
1480 for j in range(i, len(b)):
1481 raw = self.BytesIO(b)
1482 bufio = self.tp(raw, 100)
1483 mutate(bufio, i, j)
1484 bufio.flush()
1485 expected = bytearray(b)
1486 expected[j] = 2
1487 expected[i] = 1
1488 self.assertEqual(raw.getvalue(), expected,
1489 "failed result for i=%d, j=%d" % (i, j))
1490
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001491 def test_truncate_after_read_or_write(self):
1492 raw = self.BytesIO(b"A" * 10)
1493 bufio = self.tp(raw, 100)
1494 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1495 self.assertEqual(bufio.truncate(), 2)
1496 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1497 self.assertEqual(bufio.truncate(), 4)
1498
Antoine Pitrou19690592009-06-12 20:14:08 +00001499 def test_misbehaved_io(self):
1500 BufferedReaderTest.test_misbehaved_io(self)
1501 BufferedWriterTest.test_misbehaved_io(self)
1502
1503class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest):
1504 tp = io.BufferedRandom
1505
1506 def test_constructor(self):
1507 BufferedRandomTest.test_constructor(self)
1508 # The allocation can succeed on 32-bit builds, e.g. with more
1509 # than 2GB RAM and a 64-bit kernel.
1510 if sys.maxsize > 0x7FFFFFFF:
1511 rawio = self.MockRawIO()
1512 bufio = self.tp(rawio)
1513 self.assertRaises((OverflowError, MemoryError, ValueError),
1514 bufio.__init__, rawio, sys.maxsize)
1515
1516 def test_garbage_collection(self):
1517 CBufferedReaderTest.test_garbage_collection(self)
1518 CBufferedWriterTest.test_garbage_collection(self)
1519
1520class PyBufferedRandomTest(BufferedRandomTest):
1521 tp = pyio.BufferedRandom
1522
1523
Christian Heimes1a6387e2008-03-26 12:49:49 +00001524# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1525# properties:
1526# - A single output character can correspond to many bytes of input.
1527# - The number of input bytes to complete the character can be
1528# undetermined until the last input byte is received.
1529# - The number of input bytes can vary depending on previous input.
1530# - A single input byte can correspond to many characters of output.
1531# - The number of output characters can be undetermined until the
1532# last input byte is received.
1533# - The number of output characters can vary depending on previous input.
1534
1535class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1536 """
1537 For testing seek/tell behavior with a stateful, buffering decoder.
1538
1539 Input is a sequence of words. Words may be fixed-length (length set
1540 by input) or variable-length (period-terminated). In variable-length
1541 mode, extra periods are ignored. Possible words are:
1542 - 'i' followed by a number sets the input length, I (maximum 99).
1543 When I is set to 0, words are space-terminated.
1544 - 'o' followed by a number sets the output length, O (maximum 99).
1545 - Any other word is converted into a word followed by a period on
1546 the output. The output word consists of the input word truncated
1547 or padded out with hyphens to make its length equal to O. If O
1548 is 0, the word is output verbatim without truncating or padding.
1549 I and O are initially set to 1. When I changes, any buffered input is
1550 re-scanned according to the new I. EOF also terminates the last word.
1551 """
1552
1553 def __init__(self, errors='strict'):
1554 codecs.IncrementalDecoder.__init__(self, errors)
1555 self.reset()
1556
1557 def __repr__(self):
1558 return '<SID %x>' % id(self)
1559
1560 def reset(self):
1561 self.i = 1
1562 self.o = 1
1563 self.buffer = bytearray()
1564
1565 def getstate(self):
1566 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1567 return bytes(self.buffer), i*100 + o
1568
1569 def setstate(self, state):
1570 buffer, io = state
1571 self.buffer = bytearray(buffer)
1572 i, o = divmod(io, 100)
1573 self.i, self.o = i ^ 1, o ^ 1
1574
1575 def decode(self, input, final=False):
1576 output = ''
1577 for b in input:
1578 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001579 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001580 if self.buffer:
1581 output += self.process_word()
1582 else:
1583 self.buffer.append(b)
1584 else: # fixed-length, terminate after self.i bytes
1585 self.buffer.append(b)
1586 if len(self.buffer) == self.i:
1587 output += self.process_word()
1588 if final and self.buffer: # EOF terminates the last word
1589 output += self.process_word()
1590 return output
1591
1592 def process_word(self):
1593 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001594 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001595 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001596 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001597 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1598 else:
1599 output = self.buffer.decode('ascii')
1600 if len(output) < self.o:
1601 output += '-'*self.o # pad out with hyphens
1602 if self.o:
1603 output = output[:self.o] # truncate to output length
1604 output += '.'
1605 self.buffer = bytearray()
1606 return output
1607
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001608 codecEnabled = False
1609
1610 @classmethod
1611 def lookupTestDecoder(cls, name):
1612 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001613 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001614 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001615 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001616 incrementalencoder=None,
1617 streamreader=None, streamwriter=None,
1618 incrementaldecoder=cls)
1619
1620# Register the previous decoder for testing.
1621# Disabled by default, tests will enable it.
1622codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1623
1624
Christian Heimes1a6387e2008-03-26 12:49:49 +00001625class StatefulIncrementalDecoderTest(unittest.TestCase):
1626 """
1627 Make sure the StatefulIncrementalDecoder actually works.
1628 """
1629
1630 test_cases = [
1631 # I=1, O=1 (fixed-length input == fixed-length output)
1632 (b'abcd', False, 'a.b.c.d.'),
1633 # I=0, O=0 (variable-length input, variable-length output)
1634 (b'oiabcd', True, 'abcd.'),
1635 # I=0, O=0 (should ignore extra periods)
1636 (b'oi...abcd...', True, 'abcd.'),
1637 # I=0, O=6 (variable-length input, fixed-length output)
1638 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1639 # I=2, O=6 (fixed-length input < fixed-length output)
1640 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1641 # I=6, O=3 (fixed-length input > fixed-length output)
1642 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1643 # I=0, then 3; O=29, then 15 (with longer output)
1644 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1645 'a----------------------------.' +
1646 'b----------------------------.' +
1647 'cde--------------------------.' +
1648 'abcdefghijabcde.' +
1649 'a.b------------.' +
1650 '.c.------------.' +
1651 'd.e------------.' +
1652 'k--------------.' +
1653 'l--------------.' +
1654 'm--------------.')
1655 ]
1656
Antoine Pitrou19690592009-06-12 20:14:08 +00001657 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001658 # Try a few one-shot test cases.
1659 for input, eof, output in self.test_cases:
1660 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001661 self.assertEqual(d.decode(input, eof), output)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001662
1663 # Also test an unfinished decode, followed by forcing EOF.
1664 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001665 self.assertEqual(d.decode(b'oiabcd'), '')
1666 self.assertEqual(d.decode(b'', 1), 'abcd.')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001667
1668class TextIOWrapperTest(unittest.TestCase):
1669
1670 def setUp(self):
1671 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1672 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001673 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001674
1675 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001676 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001677
Antoine Pitrou19690592009-06-12 20:14:08 +00001678 def test_constructor(self):
1679 r = self.BytesIO(b"\xc3\xa9\n\n")
1680 b = self.BufferedReader(r, 1000)
1681 t = self.TextIOWrapper(b)
1682 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti2623a372010-11-21 13:34:58 +00001683 self.assertEqual(t.encoding, "latin1")
1684 self.assertEqual(t.line_buffering, False)
Antoine Pitrou19690592009-06-12 20:14:08 +00001685 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti2623a372010-11-21 13:34:58 +00001686 self.assertEqual(t.encoding, "utf8")
1687 self.assertEqual(t.line_buffering, True)
1688 self.assertEqual("\xe9\n", t.readline())
Antoine Pitrou19690592009-06-12 20:14:08 +00001689 self.assertRaises(TypeError, t.__init__, b, newline=42)
1690 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1691
1692 def test_detach(self):
1693 r = self.BytesIO()
1694 b = self.BufferedWriter(r)
1695 t = self.TextIOWrapper(b)
1696 self.assertIs(t.detach(), b)
1697
1698 t = self.TextIOWrapper(b, encoding="ascii")
1699 t.write("howdy")
1700 self.assertFalse(r.getvalue())
1701 t.detach()
1702 self.assertEqual(r.getvalue(), b"howdy")
1703 self.assertRaises(ValueError, t.detach)
1704
1705 def test_repr(self):
1706 raw = self.BytesIO("hello".encode("utf-8"))
1707 b = self.BufferedReader(raw)
1708 t = self.TextIOWrapper(b, encoding="utf-8")
1709 modname = self.TextIOWrapper.__module__
1710 self.assertEqual(repr(t),
1711 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1712 raw.name = "dummy"
1713 self.assertEqual(repr(t),
1714 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1715 raw.name = b"dummy"
1716 self.assertEqual(repr(t),
1717 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1718
1719 def test_line_buffering(self):
1720 r = self.BytesIO()
1721 b = self.BufferedWriter(r, 1000)
1722 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1723 t.write("X")
Ezio Melotti2623a372010-11-21 13:34:58 +00001724 self.assertEqual(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001725 t.write("Y\nZ")
Ezio Melotti2623a372010-11-21 13:34:58 +00001726 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001727 t.write("A\rB")
Ezio Melotti2623a372010-11-21 13:34:58 +00001728 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001729
Antoine Pitrou19690592009-06-12 20:14:08 +00001730 def test_encoding(self):
1731 # Check the encoding attribute is always set, and valid
1732 b = self.BytesIO()
1733 t = self.TextIOWrapper(b, encoding="utf8")
1734 self.assertEqual(t.encoding, "utf8")
1735 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001736 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00001737 codecs.lookup(t.encoding)
1738
1739 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001740 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001741 b = self.BytesIO(b"abc\n\xff\n")
1742 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001743 self.assertRaises(UnicodeError, t.read)
1744 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001745 b = self.BytesIO(b"abc\n\xff\n")
1746 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001747 self.assertRaises(UnicodeError, t.read)
1748 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001749 b = self.BytesIO(b"abc\n\xff\n")
1750 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti2623a372010-11-21 13:34:58 +00001751 self.assertEqual(t.read(), "abc\n\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001752 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001753 b = self.BytesIO(b"abc\n\xff\n")
1754 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti2623a372010-11-21 13:34:58 +00001755 self.assertEqual(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001756
Antoine Pitrou19690592009-06-12 20:14:08 +00001757 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001758 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001759 b = self.BytesIO()
1760 t = self.TextIOWrapper(b, encoding="ascii")
1761 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001762 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001763 b = self.BytesIO()
1764 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1765 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001766 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001767 b = self.BytesIO()
1768 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001769 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001770 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001771 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001772 self.assertEqual(b.getvalue(), b"abcdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001773 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001774 b = self.BytesIO()
1775 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001776 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001777 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001778 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001779 self.assertEqual(b.getvalue(), b"abc?def\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001780
Antoine Pitrou19690592009-06-12 20:14:08 +00001781 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001782 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1783
1784 tests = [
1785 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1786 [ '', input_lines ],
1787 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1788 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1789 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1790 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001791 encodings = (
1792 'utf-8', 'latin-1',
1793 'utf-16', 'utf-16-le', 'utf-16-be',
1794 'utf-32', 'utf-32-le', 'utf-32-be',
1795 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00001796
1797 # Try a range of buffer sizes to test the case where \r is the last
1798 # character in TextIOWrapper._pending_line.
1799 for encoding in encodings:
1800 # XXX: str.encode() should return bytes
1801 data = bytes(''.join(input_lines).encode(encoding))
1802 for do_reads in (False, True):
1803 for bufsize in range(1, 10):
1804 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001805 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1806 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00001807 encoding=encoding)
1808 if do_reads:
1809 got_lines = []
1810 while True:
1811 c2 = textio.read(2)
1812 if c2 == '':
1813 break
Ezio Melotti2623a372010-11-21 13:34:58 +00001814 self.assertEqual(len(c2), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001815 got_lines.append(c2 + textio.readline())
1816 else:
1817 got_lines = list(textio)
1818
1819 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti2623a372010-11-21 13:34:58 +00001820 self.assertEqual(got_line, exp_line)
1821 self.assertEqual(len(got_lines), len(exp_lines))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001822
Antoine Pitrou19690592009-06-12 20:14:08 +00001823 def test_newlines_input(self):
1824 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001825 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1826 for newline, expected in [
1827 (None, normalized.decode("ascii").splitlines(True)),
1828 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00001829 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1830 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1831 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00001832 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00001833 buf = self.BytesIO(testdata)
1834 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti2623a372010-11-21 13:34:58 +00001835 self.assertEqual(txt.readlines(), expected)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001836 txt.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001837 self.assertEqual(txt.read(), "".join(expected))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001838
Antoine Pitrou19690592009-06-12 20:14:08 +00001839 def test_newlines_output(self):
1840 testdict = {
1841 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1842 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1843 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1844 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1845 }
1846 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1847 for newline, expected in tests:
1848 buf = self.BytesIO()
1849 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1850 txt.write("AAA\nB")
1851 txt.write("BB\nCCC\n")
1852 txt.write("X\rY\r\nZ")
1853 txt.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001854 self.assertEqual(buf.closed, False)
1855 self.assertEqual(buf.getvalue(), expected)
Antoine Pitrou19690592009-06-12 20:14:08 +00001856
1857 def test_destructor(self):
1858 l = []
1859 base = self.BytesIO
1860 class MyBytesIO(base):
1861 def close(self):
1862 l.append(self.getvalue())
1863 base.close(self)
1864 b = MyBytesIO()
1865 t = self.TextIOWrapper(b, encoding="ascii")
1866 t.write("abc")
1867 del t
1868 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001869 self.assertEqual([b"abc"], l)
Antoine Pitrou19690592009-06-12 20:14:08 +00001870
1871 def test_override_destructor(self):
1872 record = []
1873 class MyTextIO(self.TextIOWrapper):
1874 def __del__(self):
1875 record.append(1)
1876 try:
1877 f = super(MyTextIO, self).__del__
1878 except AttributeError:
1879 pass
1880 else:
1881 f()
1882 def close(self):
1883 record.append(2)
1884 super(MyTextIO, self).close()
1885 def flush(self):
1886 record.append(3)
1887 super(MyTextIO, self).flush()
1888 b = self.BytesIO()
1889 t = MyTextIO(b, encoding="ascii")
1890 del t
1891 support.gc_collect()
1892 self.assertEqual(record, [1, 2, 3])
1893
1894 def test_error_through_destructor(self):
1895 # Test that the exception state is not modified by a destructor,
1896 # even if close() fails.
1897 rawio = self.CloseFailureIO()
1898 def f():
1899 self.TextIOWrapper(rawio).xyzzy
1900 with support.captured_output("stderr") as s:
1901 self.assertRaises(AttributeError, f)
1902 s = s.getvalue().strip()
1903 if s:
1904 # The destructor *may* have printed an unraisable error, check it
1905 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001906 self.assertTrue(s.startswith("Exception IOError: "), s)
1907 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001908
1909 # Systematic tests of the text I/O API
1910
Antoine Pitrou19690592009-06-12 20:14:08 +00001911 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001912 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1913 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00001914 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001915 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00001916 self.assertEqual(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001917 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001918 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001919 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00001920 self.assertEqual(f.tell(), 0)
1921 self.assertEqual(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001922 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00001923 self.assertEqual(f.seek(0), 0)
1924 self.assertEqual(f.read(None), "abc")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001925 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001926 self.assertEqual(f.read(2), "ab")
1927 self.assertEqual(f.read(1), "c")
1928 self.assertEqual(f.read(1), "")
1929 self.assertEqual(f.read(), "")
1930 self.assertEqual(f.tell(), cookie)
1931 self.assertEqual(f.seek(0), 0)
1932 self.assertEqual(f.seek(0, 2), cookie)
1933 self.assertEqual(f.write("def"), 3)
1934 self.assertEqual(f.seek(cookie), cookie)
1935 self.assertEqual(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001936 if enc.startswith("utf"):
1937 self.multi_line_test(f, enc)
1938 f.close()
1939
1940 def multi_line_test(self, f, enc):
1941 f.seek(0)
1942 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00001943 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001944 wlines = []
1945 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
1946 chars = []
1947 for i in range(size):
1948 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00001949 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001950 wlines.append((f.tell(), line))
1951 f.write(line)
1952 f.seek(0)
1953 rlines = []
1954 while True:
1955 pos = f.tell()
1956 line = f.readline()
1957 if not line:
1958 break
1959 rlines.append((pos, line))
Ezio Melotti2623a372010-11-21 13:34:58 +00001960 self.assertEqual(rlines, wlines)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001961
Antoine Pitrou19690592009-06-12 20:14:08 +00001962 def test_telling(self):
1963 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001964 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001965 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001966 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001967 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001968 p2 = f.tell()
1969 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001970 self.assertEqual(f.tell(), p0)
1971 self.assertEqual(f.readline(), "\xff\n")
1972 self.assertEqual(f.tell(), p1)
1973 self.assertEqual(f.readline(), "\xff\n")
1974 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001975 f.seek(0)
1976 for line in f:
Ezio Melotti2623a372010-11-21 13:34:58 +00001977 self.assertEqual(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001978 self.assertRaises(IOError, f.tell)
Ezio Melotti2623a372010-11-21 13:34:58 +00001979 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001980 f.close()
1981
Antoine Pitrou19690592009-06-12 20:14:08 +00001982 def test_seeking(self):
1983 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00001984 prefix_size = chunk_size - 2
1985 u_prefix = "a" * prefix_size
1986 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti2623a372010-11-21 13:34:58 +00001987 self.assertEqual(len(u_prefix), len(prefix))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001988 u_suffix = "\u8888\n"
1989 suffix = bytes(u_suffix.encode("utf-8"))
1990 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00001991 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001992 f.write(line*2)
1993 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001994 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001995 s = f.read(prefix_size)
Ezio Melotti2623a372010-11-21 13:34:58 +00001996 self.assertEqual(s, prefix.decode("ascii"))
1997 self.assertEqual(f.tell(), prefix_size)
1998 self.assertEqual(f.readline(), u_suffix)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001999
Antoine Pitrou19690592009-06-12 20:14:08 +00002000 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002001 # Regression test for a specific bug
2002 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00002003 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002004 f.write(data)
2005 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002006 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002007 f._CHUNK_SIZE # Just test that it exists
2008 f._CHUNK_SIZE = 2
2009 f.readline()
2010 f.tell()
2011
Antoine Pitrou19690592009-06-12 20:14:08 +00002012 def test_seek_and_tell(self):
2013 #Test seek/tell using the StatefulIncrementalDecoder.
2014 # Make test faster by doing smaller seeks
2015 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00002016
Antoine Pitrou19690592009-06-12 20:14:08 +00002017 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002018 """Tell/seek to various points within a data stream and ensure
2019 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00002020 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002021 f.write(data)
2022 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002023 f = self.open(support.TESTFN, encoding='test_decoder')
2024 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00002025 decoded = f.read()
2026 f.close()
2027
2028 for i in range(min_pos, len(decoded) + 1): # seek positions
2029 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00002030 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti2623a372010-11-21 13:34:58 +00002031 self.assertEqual(f.read(i), decoded[:i])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002032 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002033 self.assertEqual(f.read(j), decoded[i:i + j])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002034 f.seek(cookie)
Ezio Melotti2623a372010-11-21 13:34:58 +00002035 self.assertEqual(f.read(), decoded[i:])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002036 f.close()
2037
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002038 # Enable the test decoder.
2039 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002040
2041 # Run the tests.
2042 try:
2043 # Try each test case.
2044 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002045 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002046
2047 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002048 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2049 offset = CHUNK_SIZE - len(input)//2
2050 prefix = b'.'*offset
2051 # Don't bother seeking into the prefix (takes too long).
2052 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002053 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002054
2055 # Ensure our test decoder won't interfere with subsequent tests.
2056 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002057 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002058
Antoine Pitrou19690592009-06-12 20:14:08 +00002059 def test_encoded_writes(self):
2060 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002061 tests = ("utf-16",
2062 "utf-16-le",
2063 "utf-16-be",
2064 "utf-32",
2065 "utf-32-le",
2066 "utf-32-be")
2067 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002068 buf = self.BytesIO()
2069 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002070 # Check if the BOM is written only once (see issue1753).
2071 f.write(data)
2072 f.write(data)
2073 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002074 self.assertEqual(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002075 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002076 self.assertEqual(f.read(), data * 2)
2077 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002078
Antoine Pitrou19690592009-06-12 20:14:08 +00002079 def test_unreadable(self):
2080 class UnReadable(self.BytesIO):
2081 def readable(self):
2082 return False
2083 txt = self.TextIOWrapper(UnReadable())
2084 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002085
Antoine Pitrou19690592009-06-12 20:14:08 +00002086 def test_read_one_by_one(self):
2087 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002088 reads = ""
2089 while True:
2090 c = txt.read(1)
2091 if not c:
2092 break
2093 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002094 self.assertEqual(reads, "AA\nBB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002095
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002096 def test_readlines(self):
2097 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2098 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2099 txt.seek(0)
2100 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2101 txt.seek(0)
2102 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2103
Christian Heimes1a6387e2008-03-26 12:49:49 +00002104 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002105 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002106 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002107 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002108 reads = ""
2109 while True:
2110 c = txt.read(128)
2111 if not c:
2112 break
2113 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002114 self.assertEqual(reads, "A"*127+"\nB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002115
2116 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002117 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002118
2119 # read one char at a time
2120 reads = ""
2121 while True:
2122 c = txt.read(1)
2123 if not c:
2124 break
2125 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002126 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002127
2128 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002129 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002130 txt._CHUNK_SIZE = 4
2131
2132 reads = ""
2133 while True:
2134 c = txt.read(4)
2135 if not c:
2136 break
2137 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002138 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002139
2140 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002141 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002142 txt._CHUNK_SIZE = 4
2143
2144 reads = txt.read(4)
2145 reads += txt.read(4)
2146 reads += txt.readline()
2147 reads += txt.readline()
2148 reads += txt.readline()
Ezio Melotti2623a372010-11-21 13:34:58 +00002149 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002150
2151 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002152 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002153 txt._CHUNK_SIZE = 4
2154
2155 reads = txt.read(4)
2156 reads += txt.read()
Ezio Melotti2623a372010-11-21 13:34:58 +00002157 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002158
2159 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002160 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002161 txt._CHUNK_SIZE = 4
2162
2163 reads = txt.read(4)
2164 pos = txt.tell()
2165 txt.seek(0)
2166 txt.seek(pos)
Ezio Melotti2623a372010-11-21 13:34:58 +00002167 self.assertEqual(txt.read(4), "BBB\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002168
2169 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002170 buffer = self.BytesIO(self.testdata)
2171 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002172
2173 self.assertEqual(buffer.seekable(), txt.seekable())
2174
Antoine Pitrou19690592009-06-12 20:14:08 +00002175 def test_append_bom(self):
2176 # The BOM is not written again when appending to a non-empty file
2177 filename = support.TESTFN
2178 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2179 with self.open(filename, 'w', encoding=charset) as f:
2180 f.write('aaa')
2181 pos = f.tell()
2182 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002183 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002184
2185 with self.open(filename, 'a', encoding=charset) as f:
2186 f.write('xxx')
2187 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002188 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002189
Antoine Pitrou19690592009-06-12 20:14:08 +00002190 def test_seek_bom(self):
2191 # Same test, but when seeking manually
2192 filename = support.TESTFN
2193 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2194 with self.open(filename, 'w', encoding=charset) as f:
2195 f.write('aaa')
2196 pos = f.tell()
2197 with self.open(filename, 'r+', encoding=charset) as f:
2198 f.seek(pos)
2199 f.write('zzz')
2200 f.seek(0)
2201 f.write('bbb')
2202 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002203 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002204
2205 def test_errors_property(self):
2206 with self.open(support.TESTFN, "w") as f:
2207 self.assertEqual(f.errors, "strict")
2208 with self.open(support.TESTFN, "w", errors="replace") as f:
2209 self.assertEqual(f.errors, "replace")
2210
Victor Stinner6a102812010-04-27 23:55:59 +00002211 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002212 def test_threads_write(self):
2213 # Issue6750: concurrent writes could duplicate data
2214 event = threading.Event()
2215 with self.open(support.TESTFN, "w", buffering=1) as f:
2216 def run(n):
2217 text = "Thread%03d\n" % n
2218 event.wait()
2219 f.write(text)
2220 threads = [threading.Thread(target=lambda n=x: run(n))
2221 for x in range(20)]
2222 for t in threads:
2223 t.start()
2224 time.sleep(0.02)
2225 event.set()
2226 for t in threads:
2227 t.join()
2228 with self.open(support.TESTFN) as f:
2229 content = f.read()
2230 for n in range(20):
Ezio Melotti2623a372010-11-21 13:34:58 +00002231 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002232
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002233 def test_flush_error_on_close(self):
2234 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2235 def bad_flush():
2236 raise IOError()
2237 txt.flush = bad_flush
2238 self.assertRaises(IOError, txt.close) # exception not swallowed
2239
2240 def test_multi_close(self):
2241 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2242 txt.close()
2243 txt.close()
2244 txt.close()
2245 self.assertRaises(ValueError, txt.flush)
2246
Antoine Pitroufc9ead62010-12-21 21:26:55 +00002247 def test_readonly_attributes(self):
2248 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2249 buf = self.BytesIO(self.testdata)
2250 with self.assertRaises((AttributeError, TypeError)):
2251 txt.buffer = buf
2252
Antoine Pitrou19690592009-06-12 20:14:08 +00002253class CTextIOWrapperTest(TextIOWrapperTest):
2254
2255 def test_initialization(self):
2256 r = self.BytesIO(b"\xc3\xa9\n\n")
2257 b = self.BufferedReader(r, 1000)
2258 t = self.TextIOWrapper(b)
2259 self.assertRaises(TypeError, t.__init__, b, newline=42)
2260 self.assertRaises(ValueError, t.read)
2261 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2262 self.assertRaises(ValueError, t.read)
2263
2264 def test_garbage_collection(self):
2265 # C TextIOWrapper objects are collected, and collecting them flushes
2266 # all data to disk.
2267 # The Python version has __del__, so it ends in gc.garbage instead.
2268 rawio = io.FileIO(support.TESTFN, "wb")
2269 b = self.BufferedWriter(rawio)
2270 t = self.TextIOWrapper(b, encoding="ascii")
2271 t.write("456def")
2272 t.x = t
2273 wr = weakref.ref(t)
2274 del t
2275 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002276 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002277 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002278 self.assertEqual(f.read(), b"456def")
2279
2280class PyTextIOWrapperTest(TextIOWrapperTest):
2281 pass
2282
2283
2284class IncrementalNewlineDecoderTest(unittest.TestCase):
2285
2286 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002287 # UTF-8 specific tests for a newline decoder
2288 def _check_decode(b, s, **kwargs):
2289 # We exercise getstate() / setstate() as well as decode()
2290 state = decoder.getstate()
Ezio Melotti2623a372010-11-21 13:34:58 +00002291 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002292 decoder.setstate(state)
Ezio Melotti2623a372010-11-21 13:34:58 +00002293 self.assertEqual(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002294
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002295 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002296
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002297 _check_decode(b'\xe8', "")
2298 _check_decode(b'\xa2', "")
2299 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002300
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002301 _check_decode(b'\xe8', "")
2302 _check_decode(b'\xa2', "")
2303 _check_decode(b'\x88', "\u8888")
2304
2305 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002306 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2307
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002308 decoder.reset()
2309 _check_decode(b'\n', "\n")
2310 _check_decode(b'\r', "")
2311 _check_decode(b'', "\n", final=True)
2312 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002313
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002314 _check_decode(b'\r', "")
2315 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002316
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002317 _check_decode(b'\r\r\n', "\n\n")
2318 _check_decode(b'\r', "")
2319 _check_decode(b'\r', "\n")
2320 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002321
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002322 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2323 _check_decode(b'\xe8\xa2\x88', "\u8888")
2324 _check_decode(b'\n', "\n")
2325 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2326 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002327
Antoine Pitrou19690592009-06-12 20:14:08 +00002328 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002329 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002330 if encoding is not None:
2331 encoder = codecs.getincrementalencoder(encoding)()
2332 def _decode_bytewise(s):
2333 # Decode one byte at a time
2334 for b in encoder.encode(s):
2335 result.append(decoder.decode(b))
2336 else:
2337 encoder = None
2338 def _decode_bytewise(s):
2339 # Decode one char at a time
2340 for c in s:
2341 result.append(decoder.decode(c))
Ezio Melotti2623a372010-11-21 13:34:58 +00002342 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002343 _decode_bytewise("abc\n\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002344 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002345 _decode_bytewise("\nabc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002346 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002347 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002348 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002349 _decode_bytewise("abc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002350 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002351 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002352 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002353 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002354 input = "abc"
2355 if encoder is not None:
2356 encoder.reset()
2357 input = encoder.encode(input)
Ezio Melotti2623a372010-11-21 13:34:58 +00002358 self.assertEqual(decoder.decode(input), "abc")
2359 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002360
2361 def test_newline_decoder(self):
2362 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002363 # None meaning the IncrementalNewlineDecoder takes unicode input
2364 # rather than bytes input
2365 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002366 'utf-16', 'utf-16-le', 'utf-16-be',
2367 'utf-32', 'utf-32-le', 'utf-32-be',
2368 )
2369 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002370 decoder = enc and codecs.getincrementaldecoder(enc)()
2371 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2372 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002373 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002374 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2375 self.check_newline_decoding_utf8(decoder)
2376
2377 def test_newline_bytes(self):
2378 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2379 def _check(dec):
Ezio Melotti2623a372010-11-21 13:34:58 +00002380 self.assertEqual(dec.newlines, None)
2381 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2382 self.assertEqual(dec.newlines, None)
2383 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2384 self.assertEqual(dec.newlines, None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002385 dec = self.IncrementalNewlineDecoder(None, translate=False)
2386 _check(dec)
2387 dec = self.IncrementalNewlineDecoder(None, translate=True)
2388 _check(dec)
2389
2390class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2391 pass
2392
2393class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2394 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002395
Christian Heimes1a6387e2008-03-26 12:49:49 +00002396
2397# XXX Tests for open()
2398
2399class MiscIOTest(unittest.TestCase):
2400
Benjamin Petersonad100c32008-11-20 22:06:22 +00002401 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002402 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002403
Antoine Pitrou19690592009-06-12 20:14:08 +00002404 def test___all__(self):
2405 for name in self.io.__all__:
2406 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002407 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002408 if name == "open":
2409 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002410 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002411 self.assertTrue(issubclass(obj, Exception), name)
2412 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002413 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002414
Benjamin Petersonad100c32008-11-20 22:06:22 +00002415 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002416 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002417 self.assertEqual(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002418 f.close()
2419
Antoine Pitrou19690592009-06-12 20:14:08 +00002420 f = self.open(support.TESTFN, "U")
Ezio Melotti2623a372010-11-21 13:34:58 +00002421 self.assertEqual(f.name, support.TESTFN)
2422 self.assertEqual(f.buffer.name, support.TESTFN)
2423 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2424 self.assertEqual(f.mode, "U")
2425 self.assertEqual(f.buffer.mode, "rb")
2426 self.assertEqual(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002427 f.close()
2428
Antoine Pitrou19690592009-06-12 20:14:08 +00002429 f = self.open(support.TESTFN, "w+")
Ezio Melotti2623a372010-11-21 13:34:58 +00002430 self.assertEqual(f.mode, "w+")
2431 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2432 self.assertEqual(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002433
Antoine Pitrou19690592009-06-12 20:14:08 +00002434 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti2623a372010-11-21 13:34:58 +00002435 self.assertEqual(g.mode, "wb")
2436 self.assertEqual(g.raw.mode, "wb")
2437 self.assertEqual(g.name, f.fileno())
2438 self.assertEqual(g.raw.name, f.fileno())
Benjamin Petersonad100c32008-11-20 22:06:22 +00002439 f.close()
2440 g.close()
2441
Antoine Pitrou19690592009-06-12 20:14:08 +00002442 def test_io_after_close(self):
2443 for kwargs in [
2444 {"mode": "w"},
2445 {"mode": "wb"},
2446 {"mode": "w", "buffering": 1},
2447 {"mode": "w", "buffering": 2},
2448 {"mode": "wb", "buffering": 0},
2449 {"mode": "r"},
2450 {"mode": "rb"},
2451 {"mode": "r", "buffering": 1},
2452 {"mode": "r", "buffering": 2},
2453 {"mode": "rb", "buffering": 0},
2454 {"mode": "w+"},
2455 {"mode": "w+b"},
2456 {"mode": "w+", "buffering": 1},
2457 {"mode": "w+", "buffering": 2},
2458 {"mode": "w+b", "buffering": 0},
2459 ]:
2460 f = self.open(support.TESTFN, **kwargs)
2461 f.close()
2462 self.assertRaises(ValueError, f.flush)
2463 self.assertRaises(ValueError, f.fileno)
2464 self.assertRaises(ValueError, f.isatty)
2465 self.assertRaises(ValueError, f.__iter__)
2466 if hasattr(f, "peek"):
2467 self.assertRaises(ValueError, f.peek, 1)
2468 self.assertRaises(ValueError, f.read)
2469 if hasattr(f, "read1"):
2470 self.assertRaises(ValueError, f.read1, 1024)
2471 if hasattr(f, "readinto"):
2472 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2473 self.assertRaises(ValueError, f.readline)
2474 self.assertRaises(ValueError, f.readlines)
2475 self.assertRaises(ValueError, f.seek, 0)
2476 self.assertRaises(ValueError, f.tell)
2477 self.assertRaises(ValueError, f.truncate)
2478 self.assertRaises(ValueError, f.write,
2479 b"" if "b" in kwargs['mode'] else "")
2480 self.assertRaises(ValueError, f.writelines, [])
2481 self.assertRaises(ValueError, next, f)
2482
2483 def test_blockingioerror(self):
2484 # Various BlockingIOError issues
2485 self.assertRaises(TypeError, self.BlockingIOError)
2486 self.assertRaises(TypeError, self.BlockingIOError, 1)
2487 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2488 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2489 b = self.BlockingIOError(1, "")
2490 self.assertEqual(b.characters_written, 0)
2491 class C(unicode):
2492 pass
2493 c = C("")
2494 b = self.BlockingIOError(1, c)
2495 c.b = b
2496 b.c = c
2497 wr = weakref.ref(c)
2498 del c, b
2499 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002500 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002501
2502 def test_abcs(self):
2503 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002504 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2505 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2506 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2507 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002508
2509 def _check_abc_inheritance(self, abcmodule):
2510 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002511 self.assertIsInstance(f, abcmodule.IOBase)
2512 self.assertIsInstance(f, abcmodule.RawIOBase)
2513 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2514 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002515 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002516 self.assertIsInstance(f, abcmodule.IOBase)
2517 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2518 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2519 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002520 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002521 self.assertIsInstance(f, abcmodule.IOBase)
2522 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2523 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2524 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002525
2526 def test_abc_inheritance(self):
2527 # Test implementations inherit from their respective ABCs
2528 self._check_abc_inheritance(self)
2529
2530 def test_abc_inheritance_official(self):
2531 # Test implementations inherit from the official ABCs of the
2532 # baseline "io" module.
2533 self._check_abc_inheritance(io)
2534
2535class CMiscIOTest(MiscIOTest):
2536 io = io
2537
2538class PyMiscIOTest(MiscIOTest):
2539 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002540
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002541
2542@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2543class SignalsTest(unittest.TestCase):
2544
2545 def setUp(self):
2546 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2547
2548 def tearDown(self):
2549 signal.signal(signal.SIGALRM, self.oldalrm)
2550
2551 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00002552 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002553
2554 @unittest.skipUnless(threading, 'Threading required for this test.')
2555 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2556 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou6439c002011-02-25 21:35:47 +00002557 invokes the signal handler, and bubbles up the exception raised
2558 in the latter."""
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002559 read_results = []
2560 def _read():
2561 s = os.read(r, 1)
2562 read_results.append(s)
2563 t = threading.Thread(target=_read)
2564 t.daemon = True
2565 r, w = os.pipe()
2566 try:
2567 wio = self.io.open(w, **fdopen_kwargs)
2568 t.start()
2569 signal.alarm(1)
2570 # Fill the pipe enough that the write will be blocking.
2571 # It will be interrupted by the timer armed above. Since the
2572 # other thread has read one byte, the low-level write will
2573 # return with a successful (partial) result rather than an EINTR.
2574 # The buffered IO layer must check for pending signal
2575 # handlers, which in this case will invoke alarm_interrupt().
2576 self.assertRaises(ZeroDivisionError,
2577 wio.write, item * (1024 * 1024))
2578 t.join()
2579 # We got one byte, get another one and check that it isn't a
2580 # repeat of the first one.
2581 read_results.append(os.read(r, 1))
2582 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2583 finally:
2584 os.close(w)
2585 os.close(r)
2586 # This is deliberate. If we didn't close the file descriptor
2587 # before closing wio, wio would try to flush its internal
2588 # buffer, and block again.
2589 try:
2590 wio.close()
2591 except IOError as e:
2592 if e.errno != errno.EBADF:
2593 raise
2594
2595 def test_interrupted_write_unbuffered(self):
2596 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2597
2598 def test_interrupted_write_buffered(self):
2599 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2600
2601 def test_interrupted_write_text(self):
2602 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2603
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002604 def check_reentrant_write(self, data, **fdopen_kwargs):
2605 def on_alarm(*args):
2606 # Will be called reentrantly from the same thread
2607 wio.write(data)
2608 1/0
2609 signal.signal(signal.SIGALRM, on_alarm)
2610 r, w = os.pipe()
2611 wio = self.io.open(w, **fdopen_kwargs)
2612 try:
2613 signal.alarm(1)
2614 # Either the reentrant call to wio.write() fails with RuntimeError,
2615 # or the signal handler raises ZeroDivisionError.
2616 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2617 while 1:
2618 for i in range(100):
2619 wio.write(data)
2620 wio.flush()
2621 # Make sure the buffer doesn't fill up and block further writes
2622 os.read(r, len(data) * 100)
2623 exc = cm.exception
2624 if isinstance(exc, RuntimeError):
2625 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2626 finally:
2627 wio.close()
2628 os.close(r)
2629
2630 def test_reentrant_write_buffered(self):
2631 self.check_reentrant_write(b"xy", mode="wb")
2632
2633 def test_reentrant_write_text(self):
2634 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2635
Antoine Pitrou6439c002011-02-25 21:35:47 +00002636 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2637 """Check that a buffered read, when it gets interrupted (either
2638 returning a partial result or EINTR), properly invokes the signal
2639 handler and retries if the latter returned successfully."""
2640 r, w = os.pipe()
2641 fdopen_kwargs["closefd"] = False
2642 def alarm_handler(sig, frame):
2643 os.write(w, b"bar")
2644 signal.signal(signal.SIGALRM, alarm_handler)
2645 try:
2646 rio = self.io.open(r, **fdopen_kwargs)
2647 os.write(w, b"foo")
2648 signal.alarm(1)
2649 # Expected behaviour:
2650 # - first raw read() returns partial b"foo"
2651 # - second raw read() returns EINTR
2652 # - third raw read() returns b"bar"
2653 self.assertEqual(decode(rio.read(6)), "foobar")
2654 finally:
2655 rio.close()
2656 os.close(w)
2657 os.close(r)
2658
2659 def test_interrupterd_read_retry_buffered(self):
2660 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2661 mode="rb")
2662
2663 def test_interrupterd_read_retry_text(self):
2664 self.check_interrupted_read_retry(lambda x: x,
2665 mode="r")
2666
2667 @unittest.skipUnless(threading, 'Threading required for this test.')
2668 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2669 """Check that a buffered write, when it gets interrupted (either
2670 returning a partial result or EINTR), properly invokes the signal
2671 handler and retries if the latter returned successfully."""
2672 select = support.import_module("select")
2673 # A quantity that exceeds the buffer size of an anonymous pipe's
2674 # write end.
2675 N = 1024 * 1024
2676 r, w = os.pipe()
2677 fdopen_kwargs["closefd"] = False
2678 # We need a separate thread to read from the pipe and allow the
2679 # write() to finish. This thread is started after the SIGALRM is
2680 # received (forcing a first EINTR in write()).
2681 read_results = []
2682 write_finished = False
2683 def _read():
2684 while not write_finished:
2685 while r in select.select([r], [], [], 1.0)[0]:
2686 s = os.read(r, 1024)
2687 read_results.append(s)
2688 t = threading.Thread(target=_read)
2689 t.daemon = True
2690 def alarm1(sig, frame):
2691 signal.signal(signal.SIGALRM, alarm2)
2692 signal.alarm(1)
2693 def alarm2(sig, frame):
2694 t.start()
2695 signal.signal(signal.SIGALRM, alarm1)
2696 try:
2697 wio = self.io.open(w, **fdopen_kwargs)
2698 signal.alarm(1)
2699 # Expected behaviour:
2700 # - first raw write() is partial (because of the limited pipe buffer
2701 # and the first alarm)
2702 # - second raw write() returns EINTR (because of the second alarm)
2703 # - subsequent write()s are successful (either partial or complete)
2704 self.assertEqual(N, wio.write(item * N))
2705 wio.flush()
2706 write_finished = True
2707 t.join()
2708 self.assertEqual(N, sum(len(x) for x in read_results))
2709 finally:
2710 write_finished = True
2711 os.close(w)
2712 os.close(r)
2713 # This is deliberate. If we didn't close the file descriptor
2714 # before closing wio, wio would try to flush its internal
2715 # buffer, and could block (in case of failure).
2716 try:
2717 wio.close()
2718 except IOError as e:
2719 if e.errno != errno.EBADF:
2720 raise
2721
2722 def test_interrupterd_write_retry_buffered(self):
2723 self.check_interrupted_write_retry(b"x", mode="wb")
2724
2725 def test_interrupterd_write_retry_text(self):
2726 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
2727
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002728
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002729class CSignalsTest(SignalsTest):
2730 io = io
2731
2732class PySignalsTest(SignalsTest):
2733 io = pyio
2734
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002735 # Handling reentrancy issues would slow down _pyio even more, so the
2736 # tests are disabled.
2737 test_reentrant_write_buffered = None
2738 test_reentrant_write_text = None
2739
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002740
Christian Heimes1a6387e2008-03-26 12:49:49 +00002741def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00002742 tests = (CIOTest, PyIOTest,
2743 CBufferedReaderTest, PyBufferedReaderTest,
2744 CBufferedWriterTest, PyBufferedWriterTest,
2745 CBufferedRWPairTest, PyBufferedRWPairTest,
2746 CBufferedRandomTest, PyBufferedRandomTest,
2747 StatefulIncrementalDecoderTest,
2748 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2749 CTextIOWrapperTest, PyTextIOWrapperTest,
2750 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002751 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00002752 )
2753
2754 # Put the namespaces of the IO module we are testing and some useful mock
2755 # classes in the __dict__ of each test.
2756 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00002757 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00002758 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2759 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
2760 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
2761 globs = globals()
2762 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2763 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2764 # Avoid turning open into a bound method.
2765 py_io_ns["open"] = pyio.OpenWrapper
2766 for test in tests:
2767 if test.__name__.startswith("C"):
2768 for name, obj in c_io_ns.items():
2769 setattr(test, name, obj)
2770 elif test.__name__.startswith("Py"):
2771 for name, obj in py_io_ns.items():
2772 setattr(test, name, obj)
2773
2774 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002775
2776if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00002777 test_main()